Skip to main content

sol_parser_sdk/grpc/
instruction_parser.rs

1//! Instruction 解析器 - 完整支持 instruction + inner instruction
2//!
3//! 设计原则:
4//! - 简洁:单一入口函数,清晰的解析流程
5//! - 高性能:零拷贝,内联优化,并行处理
6//! - 可读性:每个步骤都有明确的注释
7
8use crate::core::{events::*, merger::merge_events, pumpfun_fee_enrich::enrich_pumpfun_same_tx_post_merge};
9use crate::grpc::types::EventTypeFilter;
10use crate::instr::read_pubkey_fast;
11use solana_sdk::pubkey::Pubkey;
12use solana_sdk::signature::Signature;
13use std::collections::HashMap;
14use yellowstone_grpc_proto::prelude::{Transaction, TransactionStatusMeta};
15
16/// 解析交易中的所有指令事件(instruction + inner instruction)
17///
18/// # 解析流程
19/// 1. 解析主指令(outer instructions)- 8字节 discriminator
20/// 2. 解析内部指令(inner instructions)- 16字节 discriminator
21/// 3. 合并相关事件(instruction + inner instruction)
22/// 4. 填充账户上下文
23///
24/// # 性能优化
25/// - 零分配泄漏:`program_invokes` 全程 `Pubkey` 键,与账户填充 / `fill_data` 共用同一表
26/// - 零拷贝读取指令账户字节、`read_pubkey_fast` 解码
27/// - 热路径 `#[inline]`
28/// - `should_parse_instructions` 提前跳过整段 ix 解析
29#[inline]
30pub fn parse_instructions_enhanced(
31    meta: &TransactionStatusMeta,
32    transaction: &Option<Transaction>,
33    sig: Signature,
34    slot: u64,
35    tx_idx: u64,
36    block_us: Option<i64>,
37    grpc_us: i64,
38    filter: Option<&EventTypeFilter>,
39) -> Vec<DexEvent> {
40    let Some(tx) = transaction else { return Vec::new() };
41    let Some(msg) = &tx.message else { return Vec::new() };
42
43    let recent_blockhash = if msg.recent_blockhash.is_empty() {
44        None
45    } else {
46        Some(bs58::encode(&msg.recent_blockhash).into_string())
47    };
48
49    // 提前检查:是否需要解析 instruction(根据 filter)
50    if !should_parse_instructions(filter) {
51        return Vec::new();
52    }
53
54    // 与 log 解析一致:同笔交易内若有 PumpFun create,则本 tx 的 buy 事件标记为 is_created_buy(创建者首次买入)
55    let is_created_buy = crate::logs::optimized_matcher::detect_pumpfun_create(&meta.log_messages);
56
57    // 构建账户查找表
58    let keys_len = msg.account_keys.len();
59    let writable_len = meta.loaded_writable_addresses.len();
60    let get_key = |i: usize| -> Option<&Vec<u8>> {
61        if i < keys_len {
62            msg.account_keys.get(i)
63        } else if i < keys_len + writable_len {
64            meta.loaded_writable_addresses.get(i - keys_len)
65        } else {
66            meta.loaded_readonly_addresses.get(i - keys_len - writable_len)
67        }
68    };
69
70    let mut result = Vec::with_capacity(8);
71    let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
72
73    // 步骤 1: 解析所有主指令
74    for (i, ix) in msg.instructions.iter().enumerate() {
75        let pid = get_key(ix.program_id_index as usize)
76            .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
77
78        invokes.entry(pid).or_default().push((i as i32, -1));
79
80        // 解析主指令(8字节 discriminator)
81        if let Some(event) = parse_outer_instruction(
82            &ix.data,
83            &pid,
84            sig,
85            slot,
86            tx_idx,
87            block_us,
88            grpc_us,
89            &ix.accounts,
90            &get_key,
91            filter,
92            is_created_buy,
93        ) {
94            result.push((i, None, event)); // (outer_idx, inner_idx, event)
95        }
96    }
97
98    // 步骤 2: 解析所有 inner instructions
99    for inner in &meta.inner_instructions {
100        let outer_idx = inner.index as usize;
101
102        for (j, inner_ix) in inner.instructions.iter().enumerate() {
103            let pid = get_key(inner_ix.program_id_index as usize)
104                .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
105
106            invokes.entry(pid).or_default().push((outer_idx as i32, j as i32));
107
108            // 解析 inner instruction(16字节 discriminator)
109            if let Some(event) = parse_inner_instruction(
110                &inner_ix.data,
111                &pid,
112                sig,
113                slot,
114                tx_idx,
115                block_us,
116                grpc_us,
117                filter,
118                is_created_buy,
119            ) {
120                result.push((outer_idx, Some(j), event)); // (outer_idx, Some(inner_idx), event)
121            }
122        }
123    }
124
125    // 步骤 3: 合并相关事件(instruction + inner instruction)
126    let mut merged = merge_instruction_events(result);
127    enrich_pumpfun_same_tx_post_merge(&mut merged);
128
129    for e in merged.iter_mut() {
130        if let Some(m) = e.metadata_mut() {
131            m.recent_blockhash = recent_blockhash.clone();
132        }
133    }
134
135    // 步骤 4: 填充账户上下文(invokes 与 fill_data 均使用 Pubkey 键,无堆泄漏)
136    let mut final_result = Vec::with_capacity(merged.len());
137    for mut event in merged {
138        crate::core::account_dispatcher::fill_accounts_with_owned_keys(
139            &mut event,
140            meta,
141            transaction,
142            &invokes,
143        );
144        crate::core::common_filler::fill_data(&mut event, meta, transaction, &invokes);
145        final_result.push(event);
146    }
147
148    final_result
149}
150
151// ============================================================================
152// 辅助函数
153// ============================================================================
154
155/// 解析单个主指令(outer instruction)
156///
157/// 主指令使用 8 字节 discriminator
158#[inline(always)]
159fn parse_outer_instruction<'a>(
160    data: &[u8],
161    program_id: &Pubkey,
162    sig: Signature,
163    slot: u64,
164    tx_idx: u64,
165    block_us: Option<i64>,
166    grpc_us: i64,
167    account_indices: &[u8],
168    get_key: &dyn Fn(usize) -> Option<&'a Vec<u8>>,
169    filter: Option<&EventTypeFilter>,
170    _is_created_buy: bool,
171) -> Option<DexEvent> {
172    // 检查指令数据长度(至少8字节 discriminator)
173    if data.len() < 8 {
174        return None;
175    }
176
177    // 常见 DEX 指令账户数远小于 64;栈上缓冲避免每笔 outer 一次 Vec 分配
178    const STACK_CAP: usize = 64;
179    if account_indices.len() <= STACK_CAP {
180        let mut stack = [Pubkey::default(); STACK_CAP];
181        let mut n = 0usize;
182        for &idx in account_indices {
183            if let Some(k) = get_key(idx as usize) {
184                stack[n] = read_pubkey_fast(k);
185                n += 1;
186            }
187        }
188        crate::instr::parse_instruction_unified(
189            data, &stack[..n], sig, slot, tx_idx, block_us, grpc_us, filter, program_id,
190        )
191    } else {
192        let accounts: Vec<Pubkey> = account_indices
193            .iter()
194            .filter_map(|&idx| get_key(idx as usize).map(|k| read_pubkey_fast(k)))
195            .collect();
196        crate::instr::parse_instruction_unified(
197            data, &accounts, sig, slot, tx_idx, block_us, grpc_us, filter, program_id,
198        )
199    }
200}
201
202/// 解析单个 inner instruction
203///
204/// Inner instructions 使用 16 字节 discriminator(前8字节是event hash,后8字节是magic)
205#[inline(always)]
206fn parse_inner_instruction(
207    data: &[u8],
208    program_id: &Pubkey,
209    sig: Signature,
210    slot: u64,
211    tx_idx: u64,
212    block_us: Option<i64>,
213    grpc_us: i64,
214    filter: Option<&EventTypeFilter>,
215    is_created_buy: bool,
216) -> Option<DexEvent> {
217    // 检查数据长度(至少16字节 discriminator)
218    if data.len() < 16 {
219        return None;
220    }
221
222    let metadata = EventMetadata {
223        signature: sig,
224        slot,
225        tx_index: tx_idx,
226        block_time_us: block_us.unwrap_or(0),
227        grpc_recv_us: grpc_us,
228        recent_blockhash: None, // set later on merged events in parse_instructions_enhanced
229    };
230
231    // 提取 16 字节 discriminator
232    let mut discriminator = [0u8; 16];
233    discriminator.copy_from_slice(&data[..16]);
234    let inner_data = &data[16..];
235
236    use crate::instr::{all_inner, program_ids, pump_amm_inner, pump_inner, raydium_clmm_inner};
237
238    // 根据 program_id 路由到对应的 inner instruction 解析器
239    if *program_id == program_ids::PUMPFUN_PROGRAM_ID {
240        if let Some(f) = filter {
241            if !f.includes_pumpfun() {
242                return None;
243            }
244        }
245        pump_inner::parse_pumpfun_inner_instruction(
246            &discriminator,
247            inner_data,
248            metadata,
249            is_created_buy,
250        )
251    } else if *program_id == program_ids::PUMPSWAP_PROGRAM_ID {
252        if let Some(f) = filter {
253            if !f.includes_pumpswap() {
254                return None;
255            }
256        }
257        pump_amm_inner::parse_pumpswap_inner_instruction(&discriminator, inner_data, metadata)
258    } else if *program_id == program_ids::RAYDIUM_CLMM_PROGRAM_ID {
259        raydium_clmm_inner::parse_raydium_clmm_inner_instruction(
260            &discriminator,
261            inner_data,
262            metadata,
263        )
264    } else if *program_id == program_ids::RAYDIUM_CPMM_PROGRAM_ID {
265        all_inner::raydium_cpmm::parse(&discriminator, inner_data, metadata)
266    } else if *program_id == program_ids::RAYDIUM_AMM_V4_PROGRAM_ID {
267        all_inner::raydium_amm::parse(&discriminator, inner_data, metadata)
268    } else if *program_id == program_ids::ORCA_WHIRLPOOL_PROGRAM_ID {
269        all_inner::orca::parse(&discriminator, inner_data, metadata)
270    } else if *program_id == program_ids::METEORA_POOLS_PROGRAM_ID {
271        all_inner::meteora_amm::parse(&discriminator, inner_data, metadata)
272    } else if *program_id == program_ids::METEORA_DAMM_V2_PROGRAM_ID {
273        if let Some(f) = filter {
274            if !f.includes_meteora_damm_v2() {
275                return None;
276            }
277        }
278        all_inner::meteora_damm::parse(&discriminator, inner_data, metadata)
279    } else if *program_id == program_ids::BONK_PROGRAM_ID {
280        all_inner::bonk::parse(&discriminator, inner_data, metadata)
281    } else {
282        None
283    }
284}
285
286/// 合并相关的 instruction 和 inner instruction 事件
287///
288/// 合并策略:
289/// 1. 同一个 outer_idx 的 instruction 和 inner instruction 可以合并
290/// 2. Inner instruction 在 outer instruction 之后出现(排序保证主指令在前)
291/// 3. 同一 outer 下若有多个 inner,依次链式合并进同一条事件,再输出
292/// 4. 合并后返回更完整的事件
293#[inline(always)]
294fn merge_instruction_events(events: Vec<(usize, Option<usize>, DexEvent)>) -> Vec<DexEvent> {
295    if events.is_empty() {
296        return Vec::new();
297    }
298
299    // 按 (outer_idx, inner_idx) 排序,确保顺序:同一 outer 下 **主指令在前、inner 在后**
300    // (`None` 若用 MAX 会把 outer 排到 inner 后面,导致无法 merge)
301    let mut events = events;
302    events.sort_by_key(|(outer, inner, _)| (*outer, inner.map_or(0, |i| i + 1)));
303
304    let mut result = Vec::with_capacity(events.len());
305    let mut pending_outer: Option<(usize, DexEvent)> = None;
306
307    for (outer_idx, inner_idx, event) in events {
308        match inner_idx {
309            None => {
310                // 这是一个 outer instruction
311                // 先处理之前的 pending_outer
312                if let Some((_, outer_event)) = pending_outer.take() {
313                    result.push(outer_event);
314                }
315                // 保存当前的 outer instruction,等待可能的 inner instruction
316                pending_outer = Some((outer_idx, event));
317            }
318            Some(_) => {
319                // 这是一个 inner instruction
320                if let Some((pending_outer_idx, mut outer_event)) = pending_outer.take() {
321                    if pending_outer_idx == outer_idx {
322                        // 合并进当前 outer(可多次:多段 inner 链式叠在同一条事件上)
323                        merge_events(&mut outer_event, event);
324                        pending_outer = Some((outer_idx, outer_event));
325                    } else {
326                        // 不匹配,分别保留
327                        result.push(outer_event);
328                        result.push(event);
329                    }
330                } else {
331                    // 没有 pending outer,直接添加 inner event
332                    result.push(event);
333                }
334            }
335        }
336    }
337
338    // 处理最后一个 pending_outer
339    if let Some((_, outer_event)) = pending_outer {
340        result.push(outer_event);
341    }
342
343    result
344}
345
346/// 检查是否需要解析 instructions(根据 filter)
347#[inline(always)]
348fn should_parse_instructions(filter: Option<&EventTypeFilter>) -> bool {
349    // 如果没有 filter,总是解析
350    let Some(filter) = filter else { return true };
351
352    // 如果 filter.include_only 为空,总是解析
353    let Some(ref include_only) = filter.include_only else { return true };
354
355    // PumpFun:外层 BUY/SELL 在 `instr/pump.rs` 不解析,但每笔买 inner 里仍有 Trade CPI;
356    // 仅走 `log_messages` 时,若 RPC 截断日志会 **丢多笔 Trade**。
357    // 打开 instruction+inner 解析,与日志在 `dedupe_log_instruction_events` 中按序去重合并。
358    if filter.includes_pumpfun() {
359        return true;
360    }
361
362    if filter.includes_pump_fees() {
363        return true;
364    }
365
366    // 其它协议:按需解析
367    include_only.iter().any(|t| {
368        use crate::grpc::types::EventType::*;
369        matches!(
370            t,
371            PumpFunMigrate
372                | MeteoraDammV2Swap
373                | MeteoraDammV2AddLiquidity
374                | MeteoraDammV2CreatePosition
375                | MeteoraDammV2ClosePosition
376                | MeteoraDammV2RemoveLiquidity
377        )
378    })
379}
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384
385    #[test]
386    fn test_should_parse_instructions() {
387        // 无 filter - 应该解析
388        assert!(should_parse_instructions(None));
389
390        // 有 filter 但 include_only 为空 - 应该解析
391        let filter = EventTypeFilter { include_only: None, exclude_types: None };
392        assert!(should_parse_instructions(Some(&filter)));
393
394        // 包含需要 instruction 解析的事件类型
395        use crate::grpc::types::EventType;
396        let filter = EventTypeFilter {
397            include_only: Some(vec![EventType::PumpFunMigrate]),
398            exclude_types: None,
399        };
400        assert!(should_parse_instructions(Some(&filter)));
401
402        // PumpFun 订阅:需要 instruction+inner,避免仅日志时截断丢腿
403        let filter = EventTypeFilter {
404            include_only: Some(vec![EventType::PumpFunTrade]),
405            exclude_types: None,
406        };
407        assert!(should_parse_instructions(Some(&filter)));
408    }
409
410    #[test]
411    fn test_merge_instruction_events() {
412        use solana_sdk::signature::Signature;
413
414        let metadata = EventMetadata {
415            signature: Signature::default(),
416            slot: 100,
417            tx_index: 1,
418            block_time_us: 1000,
419            grpc_recv_us: 2000,
420            recent_blockhash: None,
421        };
422
423        // 模拟:outer instruction + inner instruction(应该合并)
424        let outer_event = DexEvent::PumpFunTrade(PumpFunTradeEvent {
425            metadata: metadata.clone(),
426            bonding_curve: Pubkey::new_unique(),
427            ..Default::default()
428        });
429
430        let inner_event = DexEvent::PumpFunTrade(PumpFunTradeEvent {
431            metadata: metadata.clone(),
432            sol_amount: 1000,
433            token_amount: 2000,
434            ..Default::default()
435        });
436
437        let events = vec![
438            (0, None, outer_event),    // outer instruction at index 0
439            (0, Some(0), inner_event), // inner instruction at index 0
440        ];
441
442        let result = merge_instruction_events(events);
443
444        // 应该合并为1个事件
445        assert_eq!(result.len(), 1);
446
447        // 验证合并结果包含两者的数据
448        if let DexEvent::PumpFunTrade(trade) = &result[0] {
449            assert_eq!(trade.sol_amount, 1000); // 来自 inner
450            assert_eq!(trade.token_amount, 2000); // 来自 inner
451            assert_ne!(trade.bonding_curve, Pubkey::default()); // 来自 outer
452        } else {
453            panic!("Expected PumpFunTrade event");
454        }
455    }
456
457    #[test]
458    fn test_merge_instruction_events_chains_multiple_inners_same_outer() {
459        use solana_sdk::signature::Signature;
460
461        let metadata = EventMetadata {
462            signature: Signature::default(),
463            slot: 100,
464            tx_index: 1,
465            block_time_us: 1000,
466            grpc_recv_us: 2000,
467            recent_blockhash: None,
468        };
469
470        let bc = Pubkey::new_unique();
471        let fee = Pubkey::new_unique();
472
473        let outer_event = DexEvent::PumpFunTrade(PumpFunTradeEvent {
474            metadata: metadata.clone(),
475            bonding_curve: bc,
476            ..Default::default()
477        });
478
479        let inner_trade = DexEvent::PumpFunTrade(PumpFunTradeEvent {
480            metadata: metadata.clone(),
481            sol_amount: 1000,
482            token_amount: 2000,
483            is_buy: true,
484            ..Default::default()
485        });
486
487        // 第二段 inner 仅有 fee_recipient,无成交量 —— 不应抹掉第一段金额
488        let inner_fee_only = DexEvent::PumpFunTrade(PumpFunTradeEvent {
489            metadata: metadata.clone(),
490            fee_recipient: fee,
491            ..Default::default()
492        });
493
494        let events = vec![
495            (0, None, outer_event),
496            (0, Some(0), inner_trade),
497            (0, Some(1), inner_fee_only),
498        ];
499
500        let result = merge_instruction_events(events);
501        assert_eq!(result.len(), 1);
502        if let DexEvent::PumpFunTrade(trade) = &result[0] {
503            assert_eq!(trade.bonding_curve, bc);
504            assert_eq!(trade.sol_amount, 1000);
505            assert_eq!(trade.token_amount, 2000);
506            assert_eq!(trade.fee_recipient, fee);
507        } else {
508            panic!("Expected PumpFunTrade event");
509        }
510    }
511}