1use std::collections::HashSet;
4use std::sync::Arc;
5
6use crossbeam_queue::ArrayQueue;
7use futures::StreamExt;
8use solana_entry::entry::Entry as SolanaEntry;
9use solana_sdk::message::VersionedMessage;
10use solana_sdk::pubkey::Pubkey;
11use tokio::sync::Mutex;
12use tokio::task::JoinHandle;
13
14use crate::accounts::program_ids::SPL_TOKEN_2022_PROGRAM_ID;
15use crate::core::now_micros;
16use crate::shredstream::config::ShredStreamConfig;
17use crate::shredstream::proto::{Entry, ShredstreamProxyClient, SubscribeEntriesRequest};
18use crate::DexEvent;
19
20#[inline]
23fn get_token_program_or_default(token_program: Pubkey) -> Pubkey {
24 if token_program == Pubkey::default() {
25 SPL_TOKEN_2022_PROGRAM_ID
26 } else {
27 token_program
28 }
29}
30
31#[derive(Debug, Clone)]
34struct IxRef {
35 program_id_index: u8,
36 accounts: Vec<u8>,
37 data: Vec<u8>,
38}
39
40impl IxRef {
41 fn new(program_id_index: u8, accounts: Vec<u8>, data: Vec<u8>) -> Self {
42 Self {
43 program_id_index,
44 accounts,
45 data,
46 }
47 }
48}
49
50#[derive(Clone)]
52pub struct ShredStreamClient {
53 endpoint: String,
54 config: ShredStreamConfig,
55 subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
56}
57
58impl ShredStreamClient {
59 pub async fn new(endpoint: impl Into<String>) -> crate::common::AnyResult<Self> {
61 Self::new_with_config(endpoint, ShredStreamConfig::default()).await
62 }
63
64 pub async fn new_with_config(
66 endpoint: impl Into<String>,
67 config: ShredStreamConfig,
68 ) -> crate::common::AnyResult<Self> {
69 let endpoint = endpoint.into();
70 let _ = ShredstreamProxyClient::connect(endpoint.clone()).await?;
72
73 Ok(Self { endpoint, config, subscription_handle: Arc::new(Mutex::new(None)) })
74 }
75
76 pub async fn subscribe(&self) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
80 self.stop().await;
82
83 let queue = Arc::new(ArrayQueue::new(100_000));
84 let queue_clone = Arc::clone(&queue);
85
86 let endpoint = self.endpoint.clone();
87 let config = self.config.clone();
88
89 let handle = tokio::spawn(async move {
90 let mut delay = config.reconnect_delay_ms;
91 let mut attempts = 0u32;
92
93 loop {
94 if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
95 log::error!("Max reconnection attempts reached, giving up");
96 break;
97 }
98 attempts += 1;
99
100 match Self::stream_events(&endpoint, &queue_clone).await {
101 Ok(_) => {
102 delay = config.reconnect_delay_ms;
103 attempts = 0;
104 }
105 Err(e) => {
106 log::error!("ShredStream error: {} - retry in {}ms", e, delay);
107 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
108 delay = (delay * 2).min(60_000);
109 }
110 }
111 }
112 });
113
114 *self.subscription_handle.lock().await = Some(handle);
115 Ok(queue)
116 }
117
118 pub async fn stop(&self) {
120 if let Some(handle) = self.subscription_handle.lock().await.take() {
121 handle.abort();
122 }
123 }
124
125 async fn stream_events(
127 endpoint: &str,
128 queue: &Arc<ArrayQueue<DexEvent>>,
129 ) -> Result<(), String> {
130 let mut client = ShredstreamProxyClient::connect(endpoint.to_string())
131 .await
132 .map_err(|e| e.to_string())?;
133 let request = tonic::Request::new(SubscribeEntriesRequest {});
134 let mut stream =
135 client.subscribe_entries(request).await.map_err(|e| e.to_string())?.into_inner();
136
137 log::info!("ShredStream connected, receiving entries...");
138
139 while let Some(message) = stream.next().await {
140 match message {
141 Ok(entry) => {
142 Self::process_entry(entry, queue);
143 }
144 Err(e) => {
145 log::error!("Stream error: {:?}", e);
146 return Err(e.to_string());
147 }
148 }
149 }
150
151 Ok(())
152 }
153
154 #[inline]
156 fn process_entry(entry: Entry, queue: &Arc<ArrayQueue<DexEvent>>) {
157 let slot = entry.slot;
158 let recv_us = now_micros();
159
160 let entries = match bincode::deserialize::<Vec<SolanaEntry>>(&entry.entries) {
162 Ok(e) => e,
163 Err(e) => {
164 log::debug!("Failed to deserialize entries: {}", e);
165 return;
166 }
167 };
168
169 for entry in entries {
171 for (tx_index, transaction) in entry.transactions.iter().enumerate() {
172 Self::process_transaction(transaction, slot, recv_us, tx_index as u64, queue);
173 }
174 }
175 }
176
177 #[inline]
179 fn process_transaction(
180 transaction: &solana_sdk::transaction::VersionedTransaction,
181 slot: u64,
182 recv_us: i64,
183 tx_index: u64,
184 queue: &Arc<ArrayQueue<DexEvent>>,
185 ) {
186 if transaction.signatures.is_empty() {
187 return;
188 }
189
190 let signature = transaction.signatures[0];
191 if let VersionedMessage::V0(m) = &transaction.message {
192 if !m.address_table_lookups.is_empty() {
193 log::debug!(
194 target: "sol_parser_sdk::shredstream",
195 "V0 tx uses address lookup tables; only static keys are available — \
196 some instruction account indices may resolve to wrong pubkeys (often only 1 BUY gets is_created_buy)"
197 );
198 }
199 }
200 let accounts: Vec<Pubkey> = transaction.message.static_account_keys().to_vec();
201
202 let mut events = Vec::new();
204 Self::parse_transaction_instructions(
205 transaction,
206 &accounts,
207 signature,
208 slot,
209 tx_index,
210 recv_us,
211 &mut events,
212 );
213 crate::core::pumpfun_fee_enrich::enrich_create_v2_observed_fee_recipient(&mut events);
214
215 for mut event in events {
217 if let Some(meta) = event.metadata_mut() {
219 meta.grpc_recv_us = recv_us;
220 }
221 let _ = queue.push(event);
222 }
223 }
224
225 #[inline]
227 fn parse_transaction_instructions(
228 transaction: &solana_sdk::transaction::VersionedTransaction,
229 accounts: &[solana_sdk::pubkey::Pubkey],
230 signature: solana_sdk::signature::Signature,
231 slot: u64,
232 tx_index: u64,
233 recv_us: i64,
234 events: &mut Vec<DexEvent>,
235 ) {
236 use solana_sdk::message::VersionedMessage;
237
238 let message = &transaction.message;
239
240 let instructions: Vec<IxRef> = match message {
242 VersionedMessage::Legacy(msg) => {
243 msg.instructions.iter().map(|ix| IxRef::new(ix.program_id_index, ix.accounts.clone(), ix.data.clone())).collect()
244 }
245 VersionedMessage::V0(msg) => {
246 msg.instructions.iter().map(|ix| IxRef::new(ix.program_id_index, ix.accounts.clone(), ix.data.clone())).collect()
247 }
248 };
249
250 let (created_mints, mayhem_mints) = Self::detect_pumpfun_create_mints(&instructions, accounts);
252
253 for ix in &instructions {
255 let program_id = accounts.get(ix.program_id_index as usize);
256
257 if let Some(program_id) = program_id {
259 if *program_id == crate::instr::pump::PROGRAM_ID_PUBKEY {
260 if let Some(event) = Self::parse_pumpfun_instruction(
261 &ix.data,
262 accounts,
263 &ix.accounts,
264 signature,
265 slot,
266 tx_index,
267 recv_us,
268 &created_mints,
269 &mayhem_mints,
270 ) {
271 events.push(event);
272 }
273 }
274 }
275 }
276 }
277
278 #[inline]
287 fn detect_pumpfun_create_mints(
288 instructions: &[IxRef],
289 accounts: &[Pubkey],
290 ) -> (HashSet<Pubkey>, HashSet<Pubkey>) {
291 use crate::instr::pump::discriminators;
292
293 let mut created_mints = HashSet::new();
294 let mut mayhem_mints = HashSet::new();
295
296 for ix in instructions {
297 if let Some(program_id) = accounts.get(ix.program_id_index as usize) {
298 if *program_id == crate::instr::pump::PROGRAM_ID_PUBKEY {
299 if ix.data.len() >= 8 {
300 let disc: [u8; 8] = ix.data[0..8].try_into().unwrap_or_default();
301 if disc == discriminators::CREATE || disc == discriminators::CREATE_V2 {
302 if let Some(&mint_idx) = ix.accounts.get(0) {
304 if let Some(&mint) = accounts.get(mint_idx as usize) {
305 created_mints.insert(mint);
306
307 if disc == discriminators::CREATE_V2 {
308 let is_mayhem = crate::instr::utils::parse_create_v2_tail_fields(
309 &ix.data[8..],
310 )
311 .map(|(_, m, _)| m)
312 .unwrap_or(false);
313 if is_mayhem {
314 mayhem_mints.insert(mint);
315 }
316 }
317 }
318 }
319 }
320 }
321 }
322 }
323 }
324 (created_mints, mayhem_mints)
325 }
326
327 #[inline]
329 fn parse_pumpfun_instruction(
330 data: &[u8],
331 accounts: &[Pubkey],
332 ix_accounts: &[u8],
333 signature: solana_sdk::signature::Signature,
334 slot: u64,
335 tx_index: u64,
336 recv_us: i64,
337 created_mints: &HashSet<Pubkey>,
338 mayhem_mints: &HashSet<Pubkey>,
339 ) -> Option<DexEvent> {
340 use crate::instr::pump::discriminators;
341 use crate::instr::utils::*;
342
343 if data.len() < 8 {
344 return None;
345 }
346
347 let disc: [u8; 8] = data[0..8].try_into().ok()?;
348 let ix_data = &data[8..];
349
350 let get_account = |idx: usize| -> Option<Pubkey> {
352 ix_accounts.get(idx).and_then(|&i| accounts.get(i as usize)).copied()
353 };
354
355 match disc {
356 d if d == discriminators::CREATE => {
358 Self::parse_create_instruction(data, accounts, ix_accounts, signature, slot, tx_index, recv_us)
359 }
360 d if d == discriminators::CREATE_V2 => {
362 Self::parse_create_v2_instruction(data, accounts, ix_accounts, signature, slot, tx_index, recv_us)
363 }
364 d if d == discriminators::BUY => {
366 Self::parse_buy_instruction(
367 ix_data,
368 accounts,
369 ix_accounts,
370 signature,
371 slot,
372 tx_index,
373 recv_us,
374 created_mints,
375 mayhem_mints,
376 )
377 }
378 d if d == discriminators::SELL => {
380 Self::parse_sell_instruction(ix_data, accounts, ix_accounts, signature, slot, tx_index, recv_us)
381 }
382 d if d == discriminators::BUY_EXACT_SOL_IN => {
384 Self::parse_buy_exact_sol_in_instruction(
385 ix_data,
386 accounts,
387 ix_accounts,
388 signature,
389 slot,
390 tx_index,
391 recv_us,
392 created_mints,
393 mayhem_mints,
394 )
395 }
396 _ => None,
397 }
398 }
399
400 #[inline]
406 fn parse_create_instruction(
407 data: &[u8],
408 accounts: &[solana_sdk::pubkey::Pubkey],
409 ix_accounts: &[u8],
410 signature: solana_sdk::signature::Signature,
411 slot: u64,
412 tx_index: u64,
413 recv_us: i64,
414 ) -> Option<DexEvent> {
415 use crate::instr::utils::*;
416 use crate::core::events::*;
417
418 if ix_accounts.len() < 10 {
420 return None;
421 }
422
423 let get_account = |idx: usize| -> Option<solana_sdk::pubkey::Pubkey> {
424 ix_accounts.get(idx).and_then(|&i| accounts.get(i as usize)).copied()
425 };
426
427 let mut offset = 8; let name = if let Some((s, len)) = read_str_unchecked(data, offset) {
431 offset += len;
432 s.to_string()
433 } else {
434 String::new()
435 };
436
437 let symbol = if let Some((s, len)) = read_str_unchecked(data, offset) {
439 offset += len;
440 s.to_string()
441 } else {
442 String::new()
443 };
444
445 let uri = if let Some((s, len)) = read_str_unchecked(data, offset) {
447 offset += len;
448 s.to_string()
449 } else {
450 String::new()
451 };
452
453 let creator = if offset + 32 <= data.len() {
455 read_pubkey(data, offset).unwrap_or_default()
456 } else {
457 solana_sdk::pubkey::Pubkey::default()
458 };
459
460 let mint = get_account(0)?;
462 let bonding_curve = get_account(2).unwrap_or_default();
463 let user = get_account(7).unwrap_or_default();
464
465 let metadata = EventMetadata {
466 signature,
467 slot,
468 tx_index,
469 block_time_us: 0, grpc_recv_us: recv_us,
471 recent_blockhash: None,
472 };
473
474 Some(DexEvent::PumpFunCreate(PumpFunCreateTokenEvent {
475 metadata,
476 name,
477 symbol,
478 uri,
479 mint,
480 bonding_curve,
481 user,
482 creator,
483 token_program: get_account(9).unwrap_or_default(),
484 ..Default::default()
485 }))
486 }
487
488 #[inline]
494 fn parse_create_v2_instruction(
495 data: &[u8],
496 accounts: &[solana_sdk::pubkey::Pubkey],
497 ix_accounts: &[u8],
498 signature: solana_sdk::signature::Signature,
499 slot: u64,
500 tx_index: u64,
501 recv_us: i64,
502 ) -> Option<DexEvent> {
503 use crate::instr::utils::*;
504 use crate::core::events::*;
505
506 const CREATE_V2_MIN_ACCOUNTS: usize = 16;
507 if ix_accounts.len() < CREATE_V2_MIN_ACCOUNTS {
508 return None;
509 }
510
511 let get_account = |idx: usize| -> Option<solana_sdk::pubkey::Pubkey> {
512 ix_accounts.get(idx).and_then(|&i| accounts.get(i as usize)).copied()
513 };
514
515 let payload = &data[8..];
516 let mut offset = 0usize;
517 let name = if let Some((s, len)) = read_str_unchecked(payload, offset) {
518 offset += len;
519 s.to_string()
520 } else {
521 String::new()
522 };
523 let symbol = if let Some((s, len)) = read_str_unchecked(payload, offset) {
524 offset += len;
525 s.to_string()
526 } else {
527 String::new()
528 };
529 let uri = if let Some((s, len)) = read_str_unchecked(payload, offset) {
530 offset += len;
531 s.to_string()
532 } else {
533 String::new()
534 };
535 if payload.len() < offset + 32 + 1 {
536 return None;
537 }
538 let creator = read_pubkey(payload, offset).unwrap_or_default();
539 offset += 32;
540 let is_mayhem_mode = read_bool(payload, offset).unwrap_or(false);
541 offset += 1;
542 let is_cashback_enabled = read_bool(payload, offset).unwrap_or(false);
543
544 let mint = get_account(0)?;
545 let bonding_curve = get_account(2).unwrap_or_default();
546 let user = get_account(5).unwrap_or_default();
547
548 let metadata = EventMetadata {
549 signature,
550 slot,
551 tx_index,
552 block_time_us: 0,
553 grpc_recv_us: recv_us,
554 recent_blockhash: None,
555 };
556
557 let mayhem_program_id = get_account(9).unwrap_or_default();
558
559 Some(DexEvent::PumpFunCreateV2(PumpFunCreateV2TokenEvent {
560 metadata,
561 name,
562 symbol,
563 uri,
564 mint,
565 bonding_curve,
566 user,
567 creator,
568 mint_authority: get_account(1).unwrap_or_default(),
569 associated_bonding_curve: get_account(3).unwrap_or_default(),
570 global: get_account(4).unwrap_or_default(),
571 system_program: get_account(6).unwrap_or_default(),
572 token_program: get_account(7).unwrap_or_default(),
573 associated_token_program: get_account(8).unwrap_or_default(),
574 mayhem_program_id,
575 global_params: get_account(10).unwrap_or_default(),
576 sol_vault: get_account(11).unwrap_or_default(),
577 mayhem_state: get_account(12).unwrap_or_default(),
578 mayhem_token_vault: get_account(13).unwrap_or_default(),
579 event_authority: get_account(14).unwrap_or_default(),
580 program: get_account(15).unwrap_or_default(),
581 is_mayhem_mode,
582 is_cashback_enabled,
583 ..Default::default()
584 }))
585 }
586
587 #[inline]
589 fn parse_buy_instruction(
590 data: &[u8],
591 accounts: &[Pubkey],
592 ix_accounts: &[u8],
593 signature: solana_sdk::signature::Signature,
594 slot: u64,
595 tx_index: u64,
596 recv_us: i64,
597 created_mints: &HashSet<Pubkey>,
598 mayhem_mints: &HashSet<Pubkey>,
599 ) -> Option<DexEvent> {
600 use crate::instr::utils::*;
601 use crate::core::events::*;
602
603 if ix_accounts.len() < 7 {
604 return None;
605 }
606
607 let get_account = |idx: usize| -> Option<Pubkey> {
608 ix_accounts.get(idx).and_then(|&i| accounts.get(i as usize)).copied()
609 };
610
611 let (token_amount, sol_amount) = if data.len() >= 16 {
613 (read_u64_le(data, 0).unwrap_or(0), read_u64_le(data, 8).unwrap_or(0))
614 } else {
615 (0, 0)
616 };
617
618 let mint = get_account(2)?;
619
620 let is_created_buy = created_mints.contains(&mint);
622
623 let is_mayhem_mode = mayhem_mints.contains(&mint);
625
626 let metadata = EventMetadata {
627 signature,
628 slot,
629 tx_index,
630 block_time_us: 0,
631 grpc_recv_us: recv_us,
632 recent_blockhash: None,
633 };
634
635 Some(DexEvent::PumpFunTrade(PumpFunTradeEvent {
636 metadata,
637 mint,
638 bonding_curve: get_account(3).unwrap_or_default(),
639 user: get_account(6).unwrap_or_default(),
640 sol_amount,
641 token_amount,
642 fee_recipient: get_account(1).unwrap_or_default(),
643 is_buy: true,
644 is_created_buy,
645 timestamp: 0,
646 virtual_sol_reserves: 0,
647 virtual_token_reserves: 0,
648 real_sol_reserves: 0,
649 real_token_reserves: 0,
650 fee_basis_points: 0,
651 fee: 0,
652 creator: Pubkey::default(),
653 creator_fee_basis_points: 0,
654 creator_fee: 0,
655 track_volume: false,
656 total_unclaimed_tokens: 0,
657 total_claimed_tokens: 0,
658 current_sol_volume: 0,
659 last_update_timestamp: 0,
660 ix_name: "buy".to_string(),
661 mayhem_mode: is_mayhem_mode,
662 cashback_fee_basis_points: 0,
663 cashback: 0,
664 is_cashback_coin: false,
665 associated_bonding_curve: get_account(4).unwrap_or_default(),
666 token_program: get_token_program_or_default(get_account(8).unwrap_or_default()),
667 creator_vault: get_account(9).unwrap_or_default(),
668 account: None,
669 }))
670 }
671
672 #[inline]
674 fn parse_sell_instruction(
675 data: &[u8],
676 accounts: &[solana_sdk::pubkey::Pubkey],
677 ix_accounts: &[u8],
678 signature: solana_sdk::signature::Signature,
679 slot: u64,
680 tx_index: u64,
681 recv_us: i64,
682 ) -> Option<DexEvent> {
683 use crate::instr::utils::*;
684 use crate::core::events::*;
685
686 if ix_accounts.len() < 7 {
687 return None;
688 }
689
690 let get_account = |idx: usize| -> Option<solana_sdk::pubkey::Pubkey> {
691 ix_accounts.get(idx).and_then(|&i| accounts.get(i as usize)).copied()
692 };
693
694 let (token_amount, sol_amount) = if data.len() >= 16 {
696 (read_u64_le(data, 0).unwrap_or(0), read_u64_le(data, 8).unwrap_or(0))
697 } else {
698 (0, 0)
699 };
700
701 let mint = get_account(2)?;
702 let metadata = EventMetadata {
703 signature,
704 slot,
705 tx_index,
706 block_time_us: 0,
707 grpc_recv_us: recv_us,
708 recent_blockhash: None,
709 };
710
711 Some(DexEvent::PumpFunTrade(PumpFunTradeEvent {
712 metadata,
713 mint,
714 bonding_curve: get_account(3).unwrap_or_default(),
715 user: get_account(6).unwrap_or_default(),
716 sol_amount,
717 token_amount,
718 fee_recipient: get_account(1).unwrap_or_default(),
719 is_buy: false,
720 is_created_buy: false,
721 timestamp: 0,
722 virtual_sol_reserves: 0,
723 virtual_token_reserves: 0,
724 real_sol_reserves: 0,
725 real_token_reserves: 0,
726 fee_basis_points: 0,
727 fee: 0,
728 creator: Pubkey::default(),
729 creator_fee_basis_points: 0,
730 creator_fee: 0,
731 track_volume: false,
732 total_unclaimed_tokens: 0,
733 total_claimed_tokens: 0,
734 current_sol_volume: 0,
735 last_update_timestamp: 0,
736 ix_name: "sell".to_string(),
737 mayhem_mode: false,
738 cashback_fee_basis_points: 0,
739 cashback: 0,
740 is_cashback_coin: false,
741 associated_bonding_curve: get_account(4).unwrap_or_default(),
742 token_program: get_token_program_or_default(get_account(9).unwrap_or_default()),
743 creator_vault: get_account(8).unwrap_or_default(),
744 account: None,
745 }))
746 }
747
748 #[inline]
750 fn parse_buy_exact_sol_in_instruction(
751 data: &[u8],
752 accounts: &[Pubkey],
753 ix_accounts: &[u8],
754 signature: solana_sdk::signature::Signature,
755 slot: u64,
756 tx_index: u64,
757 recv_us: i64,
758 created_mints: &HashSet<Pubkey>,
759 mayhem_mints: &HashSet<Pubkey>,
760 ) -> Option<DexEvent> {
761 use crate::instr::utils::*;
762 use crate::core::events::*;
763
764 if ix_accounts.len() < 7 {
765 return None;
766 }
767
768 let get_account = |idx: usize| -> Option<Pubkey> {
769 ix_accounts.get(idx).and_then(|&i| accounts.get(i as usize)).copied()
770 };
771
772 let (sol_amount, token_amount) = if data.len() >= 16 {
774 (read_u64_le(data, 0).unwrap_or(0), read_u64_le(data, 8).unwrap_or(0))
775 } else {
776 (0, 0)
777 };
778
779 let mint = get_account(2)?;
780
781 let is_created_buy = created_mints.contains(&mint);
783
784 let is_mayhem_mode = mayhem_mints.contains(&mint);
786
787 let metadata = EventMetadata {
788 signature,
789 slot,
790 tx_index,
791 block_time_us: 0,
792 grpc_recv_us: recv_us,
793 recent_blockhash: None,
794 };
795
796 Some(DexEvent::PumpFunTrade(PumpFunTradeEvent {
797 metadata,
798 mint,
799 bonding_curve: get_account(3).unwrap_or_default(),
800 user: get_account(6).unwrap_or_default(),
801 sol_amount,
802 token_amount,
803 fee_recipient: get_account(1).unwrap_or_default(),
804 is_buy: true,
805 is_created_buy,
806 timestamp: 0,
807 virtual_sol_reserves: 0,
808 virtual_token_reserves: 0,
809 real_sol_reserves: 0,
810 real_token_reserves: 0,
811 fee_basis_points: 0,
812 fee: 0,
813 creator: Pubkey::default(),
814 creator_fee_basis_points: 0,
815 creator_fee: 0,
816 track_volume: false,
817 total_unclaimed_tokens: 0,
818 total_claimed_tokens: 0,
819 current_sol_volume: 0,
820 last_update_timestamp: 0,
821 ix_name: "buy_exact_sol_in".to_string(),
822 mayhem_mode: is_mayhem_mode,
823 cashback_fee_basis_points: 0,
824 cashback: 0,
825 is_cashback_coin: false,
826 associated_bonding_curve: get_account(4).unwrap_or_default(),
827 token_program: get_token_program_or_default(get_account(8).unwrap_or_default()),
828 creator_vault: get_account(9).unwrap_or_default(),
829 account: None,
830 }))
831 }
832}