Skip to main content

sol_parser_sdk/grpc/
yellowstone_tx_parse.rs

1//! Yellowstone `SubscribeUpdateTransaction` 单笔解析(logs ∥ instructions + 去重)。
2//! 从 [`super::client`] 抽出,供 crate 内与下游 streamer 复用。
3
4use std::collections::HashMap;
5use std::str::FromStr;
6
7use memchr::memmem;
8use once_cell::sync::Lazy;
9use solana_sdk::pubkey::Pubkey;
10use yellowstone_grpc_proto::prelude::{
11    SubscribeUpdateTransaction, Transaction, TransactionStatusMeta,
12};
13
14use super::transaction_meta::try_yellowstone_signature;
15use super::types::EventTypeFilter;
16use crate::DexEvent;
17
18static PROGRAM_DATA_FINDER: Lazy<memmem::Finder> =
19    Lazy::new(|| memmem::Finder::new(b"Program data: "));
20
21/// 解析单笔 Yellowstone 交易更新(含 meta):并行 logs + enhanced instructions,再 log/ix 去重合并。
22#[inline]
23pub fn parse_subscribe_update_transaction(
24    tx: &SubscribeUpdateTransaction,
25    grpc_recv_us: i64,
26    block_us: Option<i64>,
27    filter: Option<&EventTypeFilter>,
28) -> Vec<DexEvent> {
29    parse_transaction_core(tx, grpc_recv_us, block_us, filter)
30}
31
32#[inline]
33pub(crate) fn parse_transaction_core(
34    tx: &SubscribeUpdateTransaction,
35    grpc_us: i64,
36    block_us: Option<i64>,
37    filter: Option<&EventTypeFilter>,
38) -> Vec<DexEvent> {
39    let Some(info) = &tx.transaction else { return Vec::new() };
40    let Some(meta) = &info.meta else { return Vec::new() };
41
42    let sig = extract_signature(&info.signature);
43    let slot = tx.slot;
44    let idx = info.index;
45
46    let (log_events, instr_events) = rayon::join(
47        || {
48            parse_logs(
49                meta,
50                &info.transaction,
51                &meta.log_messages,
52                sig,
53                slot,
54                idx,
55                block_us,
56                grpc_us,
57                filter,
58            )
59        },
60        || parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter),
61    );
62
63    let events =
64        crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events);
65    if let Some(filter) = filter {
66        events.into_iter().map(|e| filter.normalize_dex_event(e)).collect()
67    } else {
68        events
69    }
70}
71
72/// 单笔交易解析:**顺序**执行 logs → instructions 再合并。
73///
74/// 与 [`parse_subscribe_update_transaction`](内部 `rayon::join` 并行)算法一致,但避免工作窃取与线程池调度,
75/// 在「单笔极低延迟」场景通常更快;适合嵌入 latency-sensitive 的订阅流水线。
76#[inline]
77pub fn parse_subscribe_update_transaction_low_latency(
78    tx: &SubscribeUpdateTransaction,
79    grpc_recv_us: i64,
80    block_us: Option<i64>,
81    filter: Option<&EventTypeFilter>,
82) -> Vec<DexEvent> {
83    parse_transaction_core_sequential(tx, grpc_recv_us, block_us, filter)
84}
85
86#[inline]
87fn parse_transaction_core_sequential(
88    tx: &SubscribeUpdateTransaction,
89    grpc_us: i64,
90    block_us: Option<i64>,
91    filter: Option<&EventTypeFilter>,
92) -> Vec<DexEvent> {
93    let Some(info) = &tx.transaction else {
94        return Vec::new();
95    };
96    let Some(meta) = &info.meta else {
97        return Vec::new();
98    };
99
100    let sig = extract_signature(&info.signature);
101    let slot = tx.slot;
102    let idx = info.index;
103
104    let log_events = parse_logs(
105        meta,
106        &info.transaction,
107        &meta.log_messages,
108        sig,
109        slot,
110        idx,
111        block_us,
112        grpc_us,
113        filter,
114    );
115    let instr_events =
116        parse_instructions(meta, &info.transaction, sig, slot, idx, block_us, grpc_us, filter);
117
118    let events =
119        crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events);
120    if let Some(filter) = filter {
121        events.into_iter().map(|e| filter.normalize_dex_event(e)).collect()
122    } else {
123        events
124    }
125}
126
127#[inline(always)]
128pub(crate) fn extract_signature(bytes: &[u8]) -> solana_sdk::signature::Signature {
129    try_yellowstone_signature(bytes).expect("yellowstone signature must be 64 bytes")
130}
131
132#[inline]
133fn parse_logs(
134    meta: &TransactionStatusMeta,
135    transaction: &Option<Transaction>,
136    logs: &[String],
137    sig: solana_sdk::signature::Signature,
138    slot: u64,
139    tx_idx: u64,
140    block_us: Option<i64>,
141    grpc_us: i64,
142    filter: Option<&EventTypeFilter>,
143) -> Vec<DexEvent> {
144    let recent_blockhash = transaction.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
145        if m.recent_blockhash.is_empty() {
146            None
147        } else {
148            Some(m.recent_blockhash.clone())
149        }
150    });
151
152    let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
153    let has_create = needs_pumpfun && crate::logs::optimized_matcher::detect_pumpfun_create(logs);
154
155    let mut outer_idx: i32 = -1;
156    let mut inner_idx: i32 = -1;
157    let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
158    let mut active_program_stack: Vec<Pubkey> = Vec::with_capacity(8);
159    let mut result = Vec::with_capacity(4);
160
161    for log in logs {
162        if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
163            if depth == 1 {
164                inner_idx = -1;
165                outer_idx += 1;
166            } else {
167                inner_idx += 1;
168            }
169            if let Ok(pk) = Pubkey::from_str(pid) {
170                active_program_stack.truncate(depth.saturating_sub(1));
171                active_program_stack.push(pk);
172                invokes.entry(pk).or_default().push((outer_idx, inner_idx));
173            }
174        }
175
176        if PROGRAM_DATA_FINDER.find(log.as_bytes()).is_some() {
177            let current_program = active_program_stack.last();
178            if let Some(mut e) = crate::logs::parse_log_with_program_id(
179                log,
180                sig,
181                slot,
182                tx_idx,
183                block_us,
184                grpc_us,
185                filter,
186                has_create,
187                recent_blockhash.as_deref(),
188                current_program,
189            ) {
190                crate::core::account_dispatcher::fill_accounts_with_owned_keys(
191                    &mut e,
192                    meta,
193                    transaction,
194                    &invokes,
195                );
196                crate::core::common_filler::fill_data(&mut e, meta, transaction, &invokes);
197                result.push(e);
198            }
199        }
200
201        if let Some(pid) = crate::logs::optimized_matcher::parse_program_complete_info(log) {
202            if let Ok(pk) = Pubkey::from_str(pid) {
203                if let Some(pos) = active_program_stack.iter().rposition(|active| *active == pk) {
204                    active_program_stack.truncate(pos);
205                }
206            }
207        }
208    }
209    result
210}
211
212#[inline]
213fn parse_instructions(
214    meta: &TransactionStatusMeta,
215    transaction: &Option<Transaction>,
216    sig: solana_sdk::signature::Signature,
217    slot: u64,
218    tx_idx: u64,
219    block_us: Option<i64>,
220    grpc_us: i64,
221    filter: Option<&EventTypeFilter>,
222) -> Vec<DexEvent> {
223    crate::grpc::instruction_parser::parse_instructions_enhanced(
224        meta,
225        transaction,
226        sig,
227        slot,
228        tx_idx,
229        block_us,
230        grpc_us,
231        filter,
232    )
233}