Skip to main content

sol_parser_sdk/grpc/
yellowstone_tx_parse.rs

1//! Yellowstone `SubscribeUpdateTransaction` 单笔解析(logs ∥ instructions + 去重)。
2//! 从 [`super::client`] 抽出,供 crate 内与下游 streamer 复用。
3
4use std::collections::HashMap;
5use std::str::FromStr;
6
7use memchr::memmem;
8use once_cell::sync::Lazy;
9use solana_sdk::pubkey::Pubkey;
10use yellowstone_grpc_proto::prelude::{
11    SubscribeUpdateTransaction, Transaction, TransactionStatusMeta,
12};
13
14use super::transaction_meta::try_yellowstone_signature;
15use super::types::EventTypeFilter;
16use crate::DexEvent;
17
18static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
19    Lazy::new(|| memmem::Finder::new(b"Program data: "));
20
21/// 解析单笔 Yellowstone 交易更新(含 meta):并行 logs + enhanced instructions,再 log/ix 去重合并。
22#[inline]
23pub fn parse_subscribe_update_transaction(
24    tx: &SubscribeUpdateTransaction,
25    grpc_recv_us: i64,
26    block_us: Option<i64>,
27    filter: Option<&EventTypeFilter>,
28) -> Vec<DexEvent> {
29    parse_transaction_core(tx, grpc_recv_us, block_us, filter)
30}
31
32#[inline]
33pub(crate) fn parse_transaction_core(
34    tx: &SubscribeUpdateTransaction,
35    grpc_us: i64,
36    block_us: Option<i64>,
37    filter: Option<&EventTypeFilter>,
38) -> Vec<DexEvent> {
39    let Some(info) = &tx.transaction else { return Vec::new() };
40    let Some(meta) = &info.meta else { return Vec::new() };
41
42    let sig = extract_signature(&info.signature);
43    let slot = tx.slot;
44    let idx = info.index;
45
46    let (log_events, instr_events) = rayon::join(
47        || {
48            parse_logs(
49                meta,
50                &info.transaction,
51                &meta.log_messages,
52                sig,
53                slot,
54                idx,
55                block_us,
56                grpc_us,
57                filter,
58            )
59        },
60        || parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter),
61    );
62
63    crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events)
64}
65
66/// 单笔交易解析:**顺序**执行 logs → instructions 再合并。
67///
68/// 与 [`parse_subscribe_update_transaction`](内部 `rayon::join` 并行)算法一致,但避免工作窃取与线程池调度,
69/// 在「单笔极低延迟」场景通常更快;适合嵌入 latency-sensitive 的订阅流水线。
70#[inline]
71pub fn parse_subscribe_update_transaction_low_latency(
72    tx: &SubscribeUpdateTransaction,
73    grpc_recv_us: i64,
74    block_us: Option<i64>,
75    filter: Option<&EventTypeFilter>,
76) -> Vec<DexEvent> {
77    parse_transaction_core_sequential(tx, grpc_recv_us, block_us, filter)
78}
79
80#[inline]
81fn parse_transaction_core_sequential(
82    tx: &SubscribeUpdateTransaction,
83    grpc_us: i64,
84    block_us: Option<i64>,
85    filter: Option<&EventTypeFilter>,
86) -> Vec<DexEvent> {
87    let Some(info) = &tx.transaction else {
88        return Vec::new();
89    };
90    let Some(meta) = &info.meta else {
91        return Vec::new();
92    };
93
94    let sig = extract_signature(&info.signature);
95    let slot = tx.slot;
96    let idx = info.index;
97
98    let log_events = parse_logs(
99        meta,
100        &info.transaction,
101        &meta.log_messages,
102        sig,
103        slot,
104        idx,
105        block_us,
106        grpc_us,
107        filter,
108    );
109    let instr_events =
110        parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter);
111
112    crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events)
113}
114
115#[inline(always)]
116pub(crate) fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
117    try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
118}
119
120#[inline]
121fn parse_logs(
122    meta: &TransactionStatusMeta,
123    transaction: &Option<Transaction>,
124    logs: &[String],
125    sig: solana_sdk::signature::Signature,
126    slot: u64,
127    tx_idx: u64,
128    block_us: Option<i64>,
129    grpc_us: i64,
130    filter: Option<&EventTypeFilter>,
131) -> Vec<DexEvent> {
132    let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
133        if m.recent_blockhash.is_empty() {
134            None
135        } else {
136            Some(m.recent_blockhash.clone())
137        }
138    });
139
140    let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
141    let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
142
143    let mut outer_idx: i32 = -1;
144    let mut inner_idx: i32 = -1;
145    let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
146    let mut active_program_stack: Vec<Pubkey> = Vec::with_capacity(8);
147    let mut result = Vec::with_capacity(4);
148
149    for log in logs {
150        if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
151            if depth == 1 {
152                inner_idx = -1;
153                outer_idx += 1;
154            } else {
155                inner_idx += 1;
156            }
157            if let Ok(pk) = Pubkey::from_str(pid) {
158                active_program_stack.truncate(depth.saturating_sub(1));
159                active_program_stack.push(pk);
160                invokes.entry(pk).or_default().push((outer_idx, inner_idx));
161            }
162        }
163
164        if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_some() {
165            let current_program = active_program_stack.last();
166            if let Some(mut e) = crate::logs::parse_log_with_program_id(
167                log,
168                sig,
169                slot,
170                tx_idx,
171                block_us,
172                grpc_us,
173                filter,
174                has_create,
175                recent_blockhash.as_deref(),
176                current_program,
177            ) {
178                crate::core::account_dispatcher::fill_accounts_with_owned_keys(
179                    &mut e,
180                    meta,
181                    transaction,
182                    &invokes,
183                );
184                crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
185                result.push(e);
186            }
187        }
188
189        if let Some(pid) = crate::logs::optimized_matcher::parse_program_complete_info(log) {
190            if let Ok(pk) = Pubkey::from_str(pid) {
191                if let Some(pos) = active_program_stack.iter().rposition(|active| *active == pk) {
192                    active_program_stack.truncate(pos);
193                }
194            }
195        }
196    }
197    result
198}
199
200#[inline]
201fn parse_instructions(
202    meta: &TransactionStatusMeta,
203    transaction: &Option<Transaction>,
204    sig: solana_sdk::signature::Signature,
205    slot: u64,
206    tx_idx: u64,
207    block_us: Option<i64>,
208    grpc_us: i64,
209    filter: Option<&EventTypeFilter>,
210) -> Vec<DexEvent> {
211    crate::grpc::instruction_parser::parse_instructions_enhanced(
212        meta,
213        transaction,
214        sig,
215        slot,
216        tx_idx,
217        block_us,
218        grpc_us,
219        filter,
220    )
221}