1use std::collections::HashMap;
5use std::str::FromStr;
6
7use memchr::memmem;
8use once_cell::sync::Lazy;
9use solana_sdk::pubkey::Pubkey;
10use yellowstone_grpc_proto::prelude::{
11 SubscribeUpdateTransaction, Transaction, TransactionStatusMeta,
12};
13
14use super::transaction_meta::try_yellowstone_signature;
15use super::types::EventTypeFilter;
16use crate::DexEvent;
17
18static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
19 Lazy::new(|| memmem::Finder::new(b"Program data: "));
20
21#[inline]
23pub fn parse_subscribe_update_transaction(
24 tx: &SubscribeUpdateTransaction,
25 grpc_recv_us: i64,
26 block_us: Option<i64>,
27 filter: Option<&EventTypeFilter>,
28) -> Vec<DexEvent> {
29 parse_transaction_core(tx, grpc_recv_us, block_us, filter)
30}
31
32#[inline]
33pub(crate) fn parse_transaction_core(
34 tx: &SubscribeUpdateTransaction,
35 grpc_us: i64,
36 block_us: Option<i64>,
37 filter: Option<&EventTypeFilter>,
38) -> Vec<DexEvent> {
39 let Some(info) = &tx.transaction else { return Vec::new() };
40 let Some(meta) = &info.meta else { return Vec::new() };
41
42 let sig = extract_signature(&info.signature);
43 let slot = tx.slot;
44 let idx = info.index;
45
46 let (log_events, instr_events) = rayon::join(
47 || {
48 parse_logs(
49 meta,
50 &info.transaction,
51 &meta.log_messages,
52 sig,
53 slot,
54 idx,
55 block_us,
56 grpc_us,
57 filter,
58 )
59 },
60 || parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter),
61 );
62
63 let events =
64 crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events);
65 if let Some(filter) = filter {
66 events.into_iter().map(|e| filter.normalize_dex_event(e)).collect()
67 } else {
68 events
69 }
70}
71
72#[inline]
77pub fn parse_subscribe_update_transaction_low_latency(
78 tx: &SubscribeUpdateTransaction,
79 grpc_recv_us: i64,
80 block_us: Option<i64>,
81 filter: Option<&EventTypeFilter>,
82) -> Vec<DexEvent> {
83 parse_transaction_core_sequential(tx, grpc_recv_us, block_us, filter)
84}
85
86#[inline]
87fn parse_transaction_core_sequential(
88 tx: &SubscribeUpdateTransaction,
89 grpc_us: i64,
90 block_us: Option<i64>,
91 filter: Option<&EventTypeFilter>,
92) -> Vec<DexEvent> {
93 let Some(info) = &tx.transaction else {
94 return Vec::new();
95 };
96 let Some(meta) = &info.meta else {
97 return Vec::new();
98 };
99
100 let sig = extract_signature(&info.signature);
101 let slot = tx.slot;
102 let idx = info.index;
103
104 let log_events = parse_logs(
105 meta,
106 &info.transaction,
107 &meta.log_messages,
108 sig,
109 slot,
110 idx,
111 block_us,
112 grpc_us,
113 filter,
114 );
115 let instr_events =
116 parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter);
117
118 let events =
119 crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events);
120 if let Some(filter) = filter {
121 events.into_iter().map(|e| filter.normalize_dex_event(e)).collect()
122 } else {
123 events
124 }
125}
126
127#[inline(always)]
128pub(crate) fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
129 try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
130}
131
132#[inline]
133fn parse_logs(
134 meta: &TransactionStatusMeta,
135 transaction: &Option<Transaction>,
136 logs: &[String],
137 sig: solana_sdk::signature::Signature,
138 slot: u64,
139 tx_idx: u64,
140 block_us: Option<i64>,
141 grpc_us: i64,
142 filter: Option<&EventTypeFilter>,
143) -> Vec<DexEvent> {
144 let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
145 if m.recent_blockhash.is_empty() {
146 None
147 } else {
148 Some(m.recent_blockhash.clone())
149 }
150 });
151
152 let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
153 let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
154
155 let mut outer_idx: i32 = -1;
156 let mut inner_idx: i32 = -1;
157 let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
158 let mut active_program_stack: Vec<Pubkey> = Vec::with_capacity(8);
159 let mut result = Vec::with_capacity(4);
160
161 for log in logs {
162 if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
163 if depth == 1 {
164 inner_idx = -1;
165 outer_idx += 1;
166 } else {
167 inner_idx += 1;
168 }
169 if let Ok(pk) = Pubkey::from_str(pid) {
170 active_program_stack.truncate(depth.saturating_sub(1));
171 active_program_stack.push(pk);
172 invokes.entry(pk).or_default().push((outer_idx, inner_idx));
173 }
174 }
175
176 if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_some() {
177 let current_program = active_program_stack.last();
178 if let Some(mut e) = crate::logs::parse_log_with_program_id(
179 log,
180 sig,
181 slot,
182 tx_idx,
183 block_us,
184 grpc_us,
185 filter,
186 has_create,
187 recent_blockhash.as_deref(),
188 current_program,
189 ) {
190 crate::core::account_dispatcher::fill_accounts_with_owned_keys(
191 &mut e,
192 meta,
193 transaction,
194 &invokes,
195 );
196 crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
197 result.push(e);
198 }
199 }
200
201 if let Some(pid) = crate::logs::optimized_matcher::parse_program_complete_info(log) {
202 if let Ok(pk) = Pubkey::from_str(pid) {
203 if let Some(pos) = active_program_stack.iter().rposition(|active| *active == pk) {
204 active_program_stack.truncate(pos);
205 }
206 }
207 }
208 }
209 result
210}
211
212#[inline]
213fn parse_instructions(
214 meta: &TransactionStatusMeta,
215 transaction: &Option<Transaction>,
216 sig: solana_sdk::signature::Signature,
217 slot: u64,
218 tx_idx: u64,
219 block_us: Option<i64>,
220 grpc_us: i64,
221 filter: Option<&EventTypeFilter>,
222) -> Vec<DexEvent> {
223 crate::grpc::instruction_parser::parse_instructions_enhanced(
224 meta,
225 transaction,
226 sig,
227 slot,
228 tx_idx,
229 block_us,
230 grpc_us,
231 filter,
232 )
233}