1use std::collections::HashMap;
5use std::str::FromStr;
6
7use memchr::memmem;
8use once_cell::sync::Lazy;
9use solana_sdk::pubkey::Pubkey;
10use yellowstone_grpc_proto::prelude::{
11 SubscribeUpdateTransaction, Transaction, TransactionStatusMeta,
12};
13
14use super::transaction_meta::try_yellowstone_signature;
15use super::types::EventTypeFilter;
16use crate::DexEvent;
17
18static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
19 Lazy::new(|| memmem::Finder::new(b"Program data: "));
20
21#[inline]
23pub fn parse_subscribe_update_transaction(
24 tx: &SubscribeUpdateTransaction,
25 grpc_recv_us: i64,
26 block_us: Option<i64>,
27 filter: Option<&EventTypeFilter>,
28) -> Vec<DexEvent> {
29 parse_transaction_core(tx, grpc_recv_us, block_us, filter)
30}
31
32#[inline]
33pub(crate) fn parse_transaction_core(
34 tx: &SubscribeUpdateTransaction,
35 grpc_us: i64,
36 block_us: Option<i64>,
37 filter: Option<&EventTypeFilter>,
38) -> Vec<DexEvent> {
39 let Some(info) = &tx.transaction else { return Vec::new() };
40 let Some(meta) = &info.meta else { return Vec::new() };
41
42 let sig = extract_signature(&info.signature);
43 let slot = tx.slot;
44 let idx = info.index;
45
46 let (log_events, instr_events) = rayon::join(
47 || {
48 parse_logs(
49 meta,
50 &info.transaction,
51 &meta.log_messages,
52 sig,
53 slot,
54 idx,
55 block_us,
56 grpc_us,
57 filter,
58 )
59 },
60 || parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter),
61 );
62
63 crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events)
64}
65
66#[inline]
71pub fn parse_subscribe_update_transaction_low_latency(
72 tx: &SubscribeUpdateTransaction,
73 grpc_recv_us: i64,
74 block_us: Option<i64>,
75 filter: Option<&EventTypeFilter>,
76) -> Vec<DexEvent> {
77 parse_transaction_core_sequential(tx, grpc_recv_us, block_us, filter)
78}
79
80#[inline]
81fn parse_transaction_core_sequential(
82 tx: &SubscribeUpdateTransaction,
83 grpc_us: i64,
84 block_us: Option<i64>,
85 filter: Option<&EventTypeFilter>,
86) -> Vec<DexEvent> {
87 let Some(info) = &tx.transaction else {
88 return Vec::new();
89 };
90 let Some(meta) = &info.meta else {
91 return Vec::new();
92 };
93
94 let sig = extract_signature(&info.signature);
95 let slot = tx.slot;
96 let idx = info.index;
97
98 let log_events = parse_logs(
99 meta,
100 &info.transaction,
101 &meta.log_messages,
102 sig,
103 slot,
104 idx,
105 block_us,
106 grpc_us,
107 filter,
108 );
109 let instr_events =
110 parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter);
111
112 crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events)
113}
114
115#[inline(always)]
116pub(crate) fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
117 try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
118}
119
120#[inline]
121fn parse_logs(
122 meta: &TransactionStatusMeta,
123 transaction: &Option<Transaction>,
124 logs: &[String],
125 sig: solana_sdk::signature::Signature,
126 slot: u64,
127 tx_idx: u64,
128 block_us: Option<i64>,
129 grpc_us: i64,
130 filter: Option<&EventTypeFilter>,
131) -> Vec<DexEvent> {
132 let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
133 if m.recent_blockhash.is_empty() {
134 None
135 } else {
136 Some(m.recent_blockhash.clone())
137 }
138 });
139
140 let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
141 let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
142
143 let mut outer_idx: i32 = -1;
144 let mut inner_idx: i32 = -1;
145 let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
146 let mut active_program_stack: Vec<Pubkey> = Vec::with_capacity(8);
147 let mut result = Vec::with_capacity(4);
148
149 for log in logs {
150 if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
151 if depth == 1 {
152 inner_idx = -1;
153 outer_idx += 1;
154 } else {
155 inner_idx += 1;
156 }
157 if let Ok(pk) = Pubkey::from_str(pid) {
158 active_program_stack.truncate(depth.saturating_sub(1));
159 active_program_stack.push(pk);
160 invokes.entry(pk).or_default().push((outer_idx, inner_idx));
161 }
162 }
163
164 if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_some() {
165 let current_program = active_program_stack.last();
166 if let Some(mut e) = crate::logs::parse_log_with_program_id(
167 log,
168 sig,
169 slot,
170 tx_idx,
171 block_us,
172 grpc_us,
173 filter,
174 has_create,
175 recent_blockhash.as_deref(),
176 current_program,
177 ) {
178 crate::core::account_dispatcher::fill_accounts_with_owned_keys(
179 &mut e,
180 meta,
181 transaction,
182 &invokes,
183 );
184 crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
185 result.push(e);
186 }
187 }
188
189 if let Some(pid) = crate::logs::optimized_matcher::parse_program_complete_info(log) {
190 if let Ok(pk) = Pubkey::from_str(pid) {
191 if let Some(pos) = active_program_stack.iter().rposition(|active| *active == pk) {
192 active_program_stack.truncate(pos);
193 }
194 }
195 }
196 }
197 result
198}
199
200#[inline]
201fn parse_instructions(
202 meta: &TransactionStatusMeta,
203 transaction: &Option<Transaction>,
204 sig: solana_sdk::signature::Signature,
205 slot: u64,
206 tx_idx: u64,
207 block_us: Option<i64>,
208 grpc_us: i64,
209 filter: Option<&EventTypeFilter>,
210) -> Vec<DexEvent> {
211 crate::grpc::instruction_parser::parse_instructions_enhanced(
212 meta,
213 transaction,
214 sig,
215 slot,
216 tx_idx,
217 block_us,
218 grpc_us,
219 filter,
220 )
221}