1use std::collections::HashSet;
4use std::sync::Arc;
5
6use crossbeam_queue::ArrayQueue;
7use futures::StreamExt;
8use solana_entry::entry::Entry as SolanaEntry;
9use solana_sdk::pubkey::Pubkey;
10use tokio::sync::Mutex;
11use tokio::task::JoinHandle;
12
13use crate::accounts::program_ids::SPL_TOKEN_2022_PROGRAM_ID;
14use crate::core::now_micros;
15use crate::shredstream::config::ShredStreamConfig;
16use crate::shredstream::proto::{Entry, ShredstreamProxyClient, SubscribeEntriesRequest};
17use crate::DexEvent;
18
19#[inline]
22fn get_token_program_or_default(token_program: Pubkey) -> Pubkey {
23 if token_program == Pubkey::default() {
24 SPL_TOKEN_2022_PROGRAM_ID
25 } else {
26 token_program
27 }
28}
29
30#[derive(Debug, Clone)]
33struct IxRef {
34 program_id_index: u8,
35 accounts: Vec<u8>,
36 data: Vec<u8>,
37}
38
39impl IxRef {
40 fn new(program_id_index: u8, accounts: Vec<u8>, data: Vec<u8>) -> Self {
41 Self {
42 program_id_index,
43 accounts,
44 data,
45 }
46 }
47}
48
49#[derive(Clone)]
51pub struct ShredStreamClient {
52 endpoint: String,
53 config: ShredStreamConfig,
54 subscription_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
55}
56
57impl ShredStreamClient {
58 pub async fn new(endpoint: impl Into<String>) -> crate::common::AnyResult<Self> {
60 Self::new_with_config(endpoint, ShredStreamConfig::default()).await
61 }
62
63 pub async fn new_with_config(
65 endpoint: impl Into<String>,
66 config: ShredStreamConfig,
67 ) -> crate::common::AnyResult<Self> {
68 let endpoint = endpoint.into();
69 let _ = ShredstreamProxyClient::connect(endpoint.clone()).await?;
71
72 Ok(Self { endpoint, config, subscription_handle: Arc::new(Mutex::new(None)) })
73 }
74
75 pub async fn subscribe(&self) -> crate::common::AnyResult<Arc<ArrayQueue<DexEvent>>> {
79 self.stop().await;
81
82 let queue = Arc::new(ArrayQueue::new(100_000));
83 let queue_clone = Arc::clone(&queue);
84
85 let endpoint = self.endpoint.clone();
86 let config = self.config.clone();
87
88 let handle = tokio::spawn(async move {
89 let mut delay = config.reconnect_delay_ms;
90 let mut attempts = 0u32;
91
92 loop {
93 if config.max_reconnect_attempts > 0 && attempts >= config.max_reconnect_attempts {
94 log::error!("Max reconnection attempts reached, giving up");
95 break;
96 }
97 attempts += 1;
98
99 match Self::stream_events(&endpoint, &queue_clone).await {
100 Ok(_) => {
101 delay = config.reconnect_delay_ms;
102 attempts = 0;
103 }
104 Err(e) => {
105 log::error!("ShredStream error: {} - retry in {}ms", e, delay);
106 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
107 delay = (delay * 2).min(60_000);
108 }
109 }
110 }
111 });
112
113 *self.subscription_handle.lock().await = Some(handle);
114 Ok(queue)
115 }
116
117 pub async fn stop(&self) {
119 if let Some(handle) = self.subscription_handle.lock().await.take() {
120 handle.abort();
121 }
122 }
123
124 async fn stream_events(
126 endpoint: &str,
127 queue: &Arc<ArrayQueue<DexEvent>>,
128 ) -> Result<(), String> {
129 let mut client = ShredstreamProxyClient::connect(endpoint.to_string())
130 .await
131 .map_err(|e| e.to_string())?;
132 let request = tonic::Request::new(SubscribeEntriesRequest {});
133 let mut stream =
134 client.subscribe_entries(request).await.map_err(|e| e.to_string())?.into_inner();
135
136 log::info!("ShredStream connected, receiving entries...");
137
138 while let Some(message) = stream.next().await {
139 match message {
140 Ok(entry) => {
141 Self::process_entry(entry, queue);
142 }
143 Err(e) => {
144 log::error!("Stream error: {:?}", e);
145 return Err(e.to_string());
146 }
147 }
148 }
149
150 Ok(())
151 }
152
153 #[inline]
155 fn process_entry(entry: Entry, queue: &Arc<ArrayQueue<DexEvent>>) {
156 let slot = entry.slot;
157 let recv_us = now_micros();
158
159 let entries = match bincode::deserialize::<Vec<SolanaEntry>>(&entry.entries) {
161 Ok(e) => e,
162 Err(e) => {
163 log::debug!("Failed to deserialize entries: {}", e);
164 return;
165 }
166 };
167
168 for entry in entries {
170 for (tx_index, transaction) in entry.transactions.iter().enumerate() {
171 Self::process_transaction(transaction, slot, recv_us, tx_index as u64, queue);
172 }
173 }
174 }
175
176 #[inline]
178 fn process_transaction(
179 transaction: &solana_sdk::transaction::VersionedTransaction,
180 slot: u64,
181 recv_us: i64,
182 tx_index: u64,
183 queue: &Arc<ArrayQueue<DexEvent>>,
184 ) {
185 if transaction.signatures.is_empty() {
186 return;
187 }
188
189 let signature = transaction.signatures[0];
190 let accounts: Vec<_> = transaction.message.static_account_keys().to_vec();
191
192 let mut events = Vec::new();
194 Self::parse_transaction_instructions(
195 transaction,
196 &accounts,
197 signature,
198 slot,
199 tx_index,
200 recv_us,
201 &mut events,
202 );
203 crate::core::pumpfun_fee_enrich::enrich_create_v2_observed_fee_recipient(&mut events);
204
205 for mut event in events {
207 if let Some(meta) = event.metadata_mut() {
209 meta.grpc_recv_us = recv_us;
210 }
211 let _ = queue.push(event);
212 }
213 }
214
215 #[inline]
217 fn parse_transaction_instructions(
218 transaction: &solana_sdk::transaction::VersionedTransaction,
219 accounts: &[solana_sdk::pubkey::Pubkey],
220 signature: solana_sdk::signature::Signature,
221 slot: u64,
222 tx_index: u64,
223 recv_us: i64,
224 events: &mut Vec<DexEvent>,
225 ) {
226 use solana_sdk::message::VersionedMessage;
227
228 let message = &transaction.message;
229
230 let instructions: Vec<IxRef> = match message {
232 VersionedMessage::Legacy(msg) => {
233 msg.instructions.iter().map(|ix| IxRef::new(ix.program_id_index, ix.accounts.clone(), ix.data.clone())).collect()
234 }
235 VersionedMessage::V0(msg) => {
236 msg.instructions.iter().map(|ix| IxRef::new(ix.program_id_index, ix.accounts.clone(), ix.data.clone())).collect()
237 }
238 };
239
240 let (created_mints, mayhem_mints) = Self::detect_pumpfun_create_mints(&instructions, accounts);
242
243 for ix in &instructions {
245 let program_id = accounts.get(ix.program_id_index as usize);
246
247 if let Some(program_id) = program_id {
249 if *program_id == crate::instr::pump::PROGRAM_ID_PUBKEY {
250 if let Some(event) = Self::parse_pumpfun_instruction(
251 &ix.data,
252 accounts,
253 &ix.accounts,
254 signature,
255 slot,
256 tx_index,
257 recv_us,
258 &created_mints,
259 &mayhem_mints,
260 ) {
261 events.push(event);
262 }
263 }
264 }
265 }
266 }
267
268 #[inline]
277 fn detect_pumpfun_create_mints(
278 instructions: &[IxRef],
279 accounts: &[Pubkey],
280 ) -> (HashSet<Pubkey>, HashSet<Pubkey>) {
281 use crate::instr::pump::discriminators;
282
283 let mut created_mints = HashSet::new();
284 let mut mayhem_mints = HashSet::new();
285
286 for ix in instructions {
287 if let Some(program_id) = accounts.get(ix.program_id_index as usize) {
288 if *program_id == crate::instr::pump::PROGRAM_ID_PUBKEY {
289 if ix.data.len() >= 8 {
290 let disc: [u8; 8] = ix.data[0..8].try_into().unwrap_or_default();
291 if disc == discriminators::CREATE || disc == discriminators::CREATE_V2 {
292 if let Some(&mint_idx) = ix.accounts.get(0) {
294 if let Some(&mint) = accounts.get(mint_idx as usize) {
295 created_mints.insert(mint);
296
297 if disc == discriminators::CREATE_V2 {
298 let is_mayhem = crate::instr::utils::parse_create_v2_tail_fields(
299 &ix.data[8..],
300 )
301 .map(|(_, m, _)| m)
302 .unwrap_or(false);
303 if is_mayhem {
304 mayhem_mints.insert(mint);
305 }
306 }
307 }
308 }
309 }
310 }
311 }
312 }
313 }
314 (created_mints, mayhem_mints)
315 }
316
317 #[inline]
319 fn parse_pumpfun_instruction(
320 data: &[u8],
321 accounts: &[Pubkey],
322 ix_accounts: &[u8],
323 signature: solana_sdk::signature::Signature,
324 slot: u64,
325 tx_index: u64,
326 recv_us: i64,
327 created_mints: &HashSet<Pubkey>,
328 mayhem_mints: &HashSet<Pubkey>,
329 ) -> Option<DexEvent> {
330 use crate::instr::pump::discriminators;
331 use crate::instr::utils::*;
332
333 if data.len() < 8 {
334 return None;
335 }
336
337 let disc: [u8; 8] = data[0..8].try_into().ok()?;
338 let ix_data = &data[8..];
339
340 let get_account = |idx: usize| -> Option<Pubkey> {
342 ix_accounts.get(idx).and_then(|&i| accounts.get(i as usize)).copied()
343 };
344
345 match disc {
346 d if d == discriminators::CREATE => {
348 Self::parse_create_instruction(data, accounts, ix_accounts, signature, slot, tx_index, recv_us)
349 }
350 d if d == discriminators::CREATE_V2 => {
352 Self::parse_create_v2_instruction(data, accounts, ix_accounts, signature, slot, tx_index, recv_us)
353 }
354 d if d == discriminators::BUY => {
356 Self::parse_buy_instruction(
357 ix_data, accounts, ix_accounts, signature, slot, tx_index, recv_us, created_mints, mayhem_mints,
358 )
359 }
360 d if d == discriminators::SELL => {
362 Self::parse_sell_instruction(ix_data, accounts, ix_accounts, signature, slot, tx_index, recv_us)
363 }
364 d if d == discriminators::BUY_EXACT_SOL_IN => {
366 Self::parse_buy_exact_sol_in_instruction(
367 ix_data, accounts, ix_accounts, signature, slot, tx_index, recv_us, created_mints, mayhem_mints,
368 )
369 }
370 _ => None,
371 }
372 }
373
374 #[inline]
380 fn parse_create_instruction(
381 data: &[u8],
382 accounts: &[solana_sdk::pubkey::Pubkey],
383 ix_accounts: &[u8],
384 signature: solana_sdk::signature::Signature,
385 slot: u64,
386 tx_index: u64,
387 recv_us: i64,
388 ) -> Option<DexEvent> {
389 use crate::instr::utils::*;
390 use crate::core::events::*;
391
392 if ix_accounts.len() < 10 {
394 return None;
395 }
396
397 let get_account = |idx: usize| -> Option<solana_sdk::pubkey::Pubkey> {
398 ix_accounts.get(idx).and_then(|&i| accounts.get(i as usize)).copied()
399 };
400
401 let mut offset = 8; let name = if let Some((s, len)) = read_str_unchecked(data, offset) {
405 offset += len;
406 s.to_string()
407 } else {
408 String::new()
409 };
410
411 let symbol = if let Some((s, len)) = read_str_unchecked(data, offset) {
413 offset += len;
414 s.to_string()
415 } else {
416 String::new()
417 };
418
419 let uri = if let Some((s, len)) = read_str_unchecked(data, offset) {
421 offset += len;
422 s.to_string()
423 } else {
424 String::new()
425 };
426
427 let creator = if offset + 32 <= data.len() {
429 read_pubkey(data, offset).unwrap_or_default()
430 } else {
431 solana_sdk::pubkey::Pubkey::default()
432 };
433
434 let mint = get_account(0)?;
436 let bonding_curve = get_account(2).unwrap_or_default();
437 let user = get_account(7).unwrap_or_default();
438
439 let metadata = EventMetadata {
440 signature,
441 slot,
442 tx_index,
443 block_time_us: 0, grpc_recv_us: recv_us,
445 recent_blockhash: None,
446 };
447
448 Some(DexEvent::PumpFunCreate(PumpFunCreateTokenEvent {
449 metadata,
450 name,
451 symbol,
452 uri,
453 mint,
454 bonding_curve,
455 user,
456 creator,
457 token_program: get_account(9).unwrap_or_default(),
458 ..Default::default()
459 }))
460 }
461
462 #[inline]
468 fn parse_create_v2_instruction(
469 data: &[u8],
470 accounts: &[solana_sdk::pubkey::Pubkey],
471 ix_accounts: &[u8],
472 signature: solana_sdk::signature::Signature,
473 slot: u64,
474 tx_index: u64,
475 recv_us: i64,
476 ) -> Option<DexEvent> {
477 use crate::instr::utils::*;
478 use crate::core::events::*;
479
480 const CREATE_V2_MIN_ACCOUNTS: usize = 16;
481 if ix_accounts.len() < CREATE_V2_MIN_ACCOUNTS {
482 return None;
483 }
484
485 let get_account = |idx: usize| -> Option<solana_sdk::pubkey::Pubkey> {
486 ix_accounts.get(idx).and_then(|&i| accounts.get(i as usize)).copied()
487 };
488
489 let payload = &data[8..];
490 let mut offset = 0usize;
491 let name = if let Some((s, len)) = read_str_unchecked(payload, offset) {
492 offset += len;
493 s.to_string()
494 } else {
495 String::new()
496 };
497 let symbol = if let Some((s, len)) = read_str_unchecked(payload, offset) {
498 offset += len;
499 s.to_string()
500 } else {
501 String::new()
502 };
503 let uri = if let Some((s, len)) = read_str_unchecked(payload, offset) {
504 offset += len;
505 s.to_string()
506 } else {
507 String::new()
508 };
509 if payload.len() < offset + 32 + 1 {
510 return None;
511 }
512 let creator = read_pubkey(payload, offset).unwrap_or_default();
513 offset += 32;
514 let is_mayhem_mode = read_bool(payload, offset).unwrap_or(false);
515 offset += 1;
516 let is_cashback_enabled = read_bool(payload, offset).unwrap_or(false);
517
518 let mint = get_account(0)?;
519 let bonding_curve = get_account(2).unwrap_or_default();
520 let user = get_account(5).unwrap_or_default();
521
522 let metadata = EventMetadata {
523 signature,
524 slot,
525 tx_index,
526 block_time_us: 0,
527 grpc_recv_us: recv_us,
528 recent_blockhash: None,
529 };
530
531 let mayhem_program_id = get_account(9).unwrap_or_default();
532
533 Some(DexEvent::PumpFunCreateV2(PumpFunCreateV2TokenEvent {
534 metadata,
535 name,
536 symbol,
537 uri,
538 mint,
539 bonding_curve,
540 user,
541 creator,
542 mint_authority: get_account(1).unwrap_or_default(),
543 associated_bonding_curve: get_account(3).unwrap_or_default(),
544 global: get_account(4).unwrap_or_default(),
545 system_program: get_account(6).unwrap_or_default(),
546 token_program: get_account(7).unwrap_or_default(),
547 associated_token_program: get_account(8).unwrap_or_default(),
548 mayhem_program_id,
549 global_params: get_account(10).unwrap_or_default(),
550 sol_vault: get_account(11).unwrap_or_default(),
551 mayhem_state: get_account(12).unwrap_or_default(),
552 mayhem_token_vault: get_account(13).unwrap_or_default(),
553 event_authority: get_account(14).unwrap_or_default(),
554 program: get_account(15).unwrap_or_default(),
555 is_mayhem_mode,
556 is_cashback_enabled,
557 ..Default::default()
558 }))
559 }
560
561 #[inline]
563 fn parse_buy_instruction(
564 data: &[u8],
565 accounts: &[Pubkey],
566 ix_accounts: &[u8],
567 signature: solana_sdk::signature::Signature,
568 slot: u64,
569 tx_index: u64,
570 recv_us: i64,
571 created_mints: &HashSet<Pubkey>,
572 mayhem_mints: &HashSet<Pubkey>,
573 ) -> Option<DexEvent> {
574 use crate::instr::utils::*;
575 use crate::core::events::*;
576
577 if ix_accounts.len() < 7 {
578 return None;
579 }
580
581 let get_account = |idx: usize| -> Option<Pubkey> {
582 ix_accounts.get(idx).and_then(|&i| accounts.get(i as usize)).copied()
583 };
584
585 let (token_amount, sol_amount) = if data.len() >= 16 {
587 (read_u64_le(data, 0).unwrap_or(0), read_u64_le(data, 8).unwrap_or(0))
588 } else {
589 (0, 0)
590 };
591
592 let mint = get_account(2)?;
593
594 let is_created_buy = created_mints.contains(&mint);
596
597 let is_mayhem_mode = mayhem_mints.contains(&mint);
599
600 let metadata = EventMetadata {
601 signature,
602 slot,
603 tx_index,
604 block_time_us: 0,
605 grpc_recv_us: recv_us,
606 recent_blockhash: None,
607 };
608
609 Some(DexEvent::PumpFunTrade(PumpFunTradeEvent {
610 metadata,
611 mint,
612 bonding_curve: get_account(3).unwrap_or_default(),
613 user: get_account(6).unwrap_or_default(),
614 sol_amount,
615 token_amount,
616 fee_recipient: get_account(1).unwrap_or_default(),
617 is_buy: true,
618 is_created_buy,
619 timestamp: 0,
620 virtual_sol_reserves: 0,
621 virtual_token_reserves: 0,
622 real_sol_reserves: 0,
623 real_token_reserves: 0,
624 fee_basis_points: 0,
625 fee: 0,
626 creator: Pubkey::default(),
627 creator_fee_basis_points: 0,
628 creator_fee: 0,
629 track_volume: false,
630 total_unclaimed_tokens: 0,
631 total_claimed_tokens: 0,
632 current_sol_volume: 0,
633 last_update_timestamp: 0,
634 ix_name: "buy".to_string(),
635 mayhem_mode: is_mayhem_mode,
636 cashback_fee_basis_points: 0,
637 cashback: 0,
638 is_cashback_coin: false,
639 associated_bonding_curve: get_account(4).unwrap_or_default(),
640 token_program: get_token_program_or_default(get_account(8).unwrap_or_default()),
641 creator_vault: get_account(9).unwrap_or_default(),
642 account: None,
643 }))
644 }
645
646 #[inline]
648 fn parse_sell_instruction(
649 data: &[u8],
650 accounts: &[solana_sdk::pubkey::Pubkey],
651 ix_accounts: &[u8],
652 signature: solana_sdk::signature::Signature,
653 slot: u64,
654 tx_index: u64,
655 recv_us: i64,
656 ) -> Option<DexEvent> {
657 use crate::instr::utils::*;
658 use crate::core::events::*;
659
660 if ix_accounts.len() < 7 {
661 return None;
662 }
663
664 let get_account = |idx: usize| -> Option<solana_sdk::pubkey::Pubkey> {
665 ix_accounts.get(idx).and_then(|&i| accounts.get(i as usize)).copied()
666 };
667
668 let (token_amount, sol_amount) = if data.len() >= 16 {
670 (read_u64_le(data, 0).unwrap_or(0), read_u64_le(data, 8).unwrap_or(0))
671 } else {
672 (0, 0)
673 };
674
675 let mint = get_account(2)?;
676 let metadata = EventMetadata {
677 signature,
678 slot,
679 tx_index,
680 block_time_us: 0,
681 grpc_recv_us: recv_us,
682 recent_blockhash: None,
683 };
684
685 Some(DexEvent::PumpFunTrade(PumpFunTradeEvent {
686 metadata,
687 mint,
688 bonding_curve: get_account(3).unwrap_or_default(),
689 user: get_account(6).unwrap_or_default(),
690 sol_amount,
691 token_amount,
692 fee_recipient: get_account(1).unwrap_or_default(),
693 is_buy: false,
694 is_created_buy: false,
695 timestamp: 0,
696 virtual_sol_reserves: 0,
697 virtual_token_reserves: 0,
698 real_sol_reserves: 0,
699 real_token_reserves: 0,
700 fee_basis_points: 0,
701 fee: 0,
702 creator: Pubkey::default(),
703 creator_fee_basis_points: 0,
704 creator_fee: 0,
705 track_volume: false,
706 total_unclaimed_tokens: 0,
707 total_claimed_tokens: 0,
708 current_sol_volume: 0,
709 last_update_timestamp: 0,
710 ix_name: "sell".to_string(),
711 mayhem_mode: false,
712 cashback_fee_basis_points: 0,
713 cashback: 0,
714 is_cashback_coin: false,
715 associated_bonding_curve: get_account(4).unwrap_or_default(),
716 token_program: get_token_program_or_default(get_account(9).unwrap_or_default()),
717 creator_vault: get_account(8).unwrap_or_default(),
718 account: None,
719 }))
720 }
721
722 #[inline]
724 fn parse_buy_exact_sol_in_instruction(
725 data: &[u8],
726 accounts: &[Pubkey],
727 ix_accounts: &[u8],
728 signature: solana_sdk::signature::Signature,
729 slot: u64,
730 tx_index: u64,
731 recv_us: i64,
732 created_mints: &HashSet<Pubkey>,
733 mayhem_mints: &HashSet<Pubkey>,
734 ) -> Option<DexEvent> {
735 use crate::instr::utils::*;
736 use crate::core::events::*;
737
738 if ix_accounts.len() < 7 {
739 return None;
740 }
741
742 let get_account = |idx: usize| -> Option<Pubkey> {
743 ix_accounts.get(idx).and_then(|&i| accounts.get(i as usize)).copied()
744 };
745
746 let (sol_amount, token_amount) = if data.len() >= 16 {
748 (read_u64_le(data, 0).unwrap_or(0), read_u64_le(data, 8).unwrap_or(0))
749 } else {
750 (0, 0)
751 };
752
753 let mint = get_account(2)?;
754
755 let is_created_buy = created_mints.contains(&mint);
757
758 let is_mayhem_mode = mayhem_mints.contains(&mint);
760
761 let metadata = EventMetadata {
762 signature,
763 slot,
764 tx_index,
765 block_time_us: 0,
766 grpc_recv_us: recv_us,
767 recent_blockhash: None,
768 };
769
770 Some(DexEvent::PumpFunTrade(PumpFunTradeEvent {
771 metadata,
772 mint,
773 bonding_curve: get_account(3).unwrap_or_default(),
774 user: get_account(6).unwrap_or_default(),
775 sol_amount,
776 token_amount,
777 fee_recipient: get_account(1).unwrap_or_default(),
778 is_buy: true,
779 is_created_buy,
780 timestamp: 0,
781 virtual_sol_reserves: 0,
782 virtual_token_reserves: 0,
783 real_sol_reserves: 0,
784 real_token_reserves: 0,
785 fee_basis_points: 0,
786 fee: 0,
787 creator: Pubkey::default(),
788 creator_fee_basis_points: 0,
789 creator_fee: 0,
790 track_volume: false,
791 total_unclaimed_tokens: 0,
792 total_claimed_tokens: 0,
793 current_sol_volume: 0,
794 last_update_timestamp: 0,
795 ix_name: "buy_exact_sol_in".to_string(),
796 mayhem_mode: is_mayhem_mode,
797 cashback_fee_basis_points: 0,
798 cashback: 0,
799 is_cashback_coin: false,
800 associated_bonding_curve: get_account(4).unwrap_or_default(),
801 token_program: get_token_program_or_default(get_account(8).unwrap_or_default()),
802 creator_vault: get_account(9).unwrap_or_default(),
803 account: None,
804 }))
805 }
806}