Skip to main content

sol_parser_sdk/grpc/
instruction_parser.rs

1//! Instruction 解析器 - 完整支持 instruction + inner instruction
2//!
3//! 设计原则:
4//! - 简洁:单一入口函数,清晰的解析流程
5//! - 高性能:零拷贝,内联优化,并行处理
6//! - 可读性:每个步骤都有明确的注释
7
8use crate::core::{
9    events::*, merger::merge_events, pumpfun_fee_enrich::enrich_pumpfun_same_tx_post_merge,
10};
11use crate::grpc::types::EventTypeFilter;
12use crate::instr::read_pubkey_fast;
13use solana_sdk::pubkey::Pubkey;
14use solana_sdk::signature::Signature;
15use std::collections::HashMap;
16use yellowstone_grpc_proto::prelude::{Transaction, TransactionStatusMeta};
17
18/// 解析交易中的所有指令事件(instruction + inner instruction)
19///
20/// # 解析流程
21/// 1. 解析主指令(outer instructions)- 8字节 discriminator
22/// 2. 解析内部指令(inner instructions)- 16字节 discriminator
23/// 3. 合并相关事件(instruction + inner instruction)
24/// 4. 填充账户上下文
25///
26/// # 性能优化
27/// - 零分配泄漏:`program_invokes` 全程 `Pubkey` 键,与账户填充 / `fill_data` 共用同一表
28/// - 零拷贝读取指令账户字节、`read_pubkey_fast` 解码
29/// - 热路径 `#[inline]`
30/// - `should_parse_instructions` 提前跳过整段 ix 解析
31#[inline]
32pub fn parse_instructions_enhanced(
33    meta: &TransactionStatusMeta,
34    transaction: &Option<Transaction>,
35    sig: Signature,
36    slot: u64,
37    tx_idx: u64,
38    block_us: Option<i64>,
39    grpc_us: i64,
40    filter: Option<&EventTypeFilter>,
41) -> Vec<DexEvent> {
42    let Some(tx) = transaction else { return Vec::new() };
43    let Some(msg) = &tx.message else { return Vec::new() };
44
45    let recent_blockhash = if msg.recent_blockhash.is_empty() {
46        None
47    } else {
48        Some(bs58::encode(&msg.recent_blockhash).into_string())
49    };
50
51    // 提前检查:是否需要解析 instruction(根据 filter)
52    if !should_parse_instructions(filter) {
53        return Vec::new();
54    }
55
56    // 与 log 解析一致:同笔交易内若有 PumpFun create,则本 tx 的 buy 事件标记为 is_created_buy(创建者首次买入)
57    let is_created_buy = crate::logs::optimized_matcher::detect_pumpfun_create(&meta.log_messages);
58
59    // 构建账户查找表
60    let keys_len = msg.account_keys.len();
61    let writable_len = meta.loaded_writable_addresses.len();
62    let get_key = |i: usize| -> Option<&Vec<u8>> {
63        if i < keys_len {
64            msg.account_keys.get(i)
65        } else if i < keys_len + writable_len {
66            meta.loaded_writable_addresses.get(i - keys_len)
67        } else {
68            meta.loaded_readonly_addresses.get(i - keys_len - writable_len)
69        }
70    };
71
72    let mut result = Vec::with_capacity(8);
73    let mut invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::with_capacity(8);
74
75    // 步骤 1: 解析所有主指令
76    for (i, ix) in msg.instructions.iter().enumerate() {
77        let pid = get_key(ix.program_id_index as usize)
78            .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
79
80        invokes.entry(pid).or_default().push((i as i32, -1));
81
82        // 解析主指令(8字节 discriminator)
83        if let Some(event) = parse_outer_instruction(
84            &ix.data,
85            &pid,
86            sig,
87            slot,
88            tx_idx,
89            block_us,
90            grpc_us,
91            &ix.accounts,
92            &get_key,
93            filter,
94            is_created_buy,
95        ) {
96            result.push((i, None, event)); // (outer_idx, inner_idx, event)
97        }
98    }
99
100    // 步骤 2: 解析所有 inner instructions
101    for inner in &meta.inner_instructions {
102        let outer_idx = inner.index as usize;
103
104        for (j, inner_ix) in inner.instructions.iter().enumerate() {
105            let pid = get_key(inner_ix.program_id_index as usize)
106                .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
107
108            invokes.entry(pid).or_default().push((outer_idx as i32, j as i32));
109
110            // 解析 inner instruction(16字节 discriminator)
111            if let Some(event) = parse_inner_instruction(
112                &inner_ix.data,
113                &pid,
114                sig,
115                slot,
116                tx_idx,
117                block_us,
118                grpc_us,
119                filter,
120                is_created_buy,
121            ) {
122                result.push((outer_idx, Some(j), event)); // (outer_idx, Some(inner_idx), event)
123            }
124        }
125    }
126
127    // 步骤 3: 合并相关事件(instruction + inner instruction)
128    let mut merged = merge_instruction_events(result);
129    enrich_pumpfun_same_tx_post_merge(&mut merged);
130
131    for e in merged.iter_mut() {
132        if let Some(m) = e.metadata_mut() {
133            m.recent_blockhash = recent_blockhash.clone();
134        }
135    }
136
137    // 步骤 4: 填充账户上下文(invokes 与 fill_data 均使用 Pubkey 键,无堆泄漏)
138    let mut final_result = Vec::with_capacity(merged.len());
139    for mut event in merged {
140        crate::core::account_dispatcher::fill_accounts_with_owned_keys(
141            &mut event,
142            meta,
143            transaction,
144            &invokes,
145        );
146        crate::core::common_filler::fill_data(&mut event, meta, transaction, &invokes);
147        final_result.push(event);
148    }
149
150    final_result
151}
152
153// ============================================================================
154// 辅助函数
155// ============================================================================
156
157/// 解析单个主指令(outer instruction)
158///
159/// 主指令使用 8 字节 discriminator
160#[inline(always)]
161fn parse_outer_instruction<'a>(
162    data: &[u8],
163    program_id: &Pubkey,
164    sig: Signature,
165    slot: u64,
166    tx_idx: u64,
167    block_us: Option<i64>,
168    grpc_us: i64,
169    account_indices: &[u8],
170    get_key: &dyn Fn(usize) -> Option<&'a Vec<u8>>,
171    filter: Option<&EventTypeFilter>,
172    _is_created_buy: bool,
173) -> Option<DexEvent> {
174    // 检查指令数据长度(至少8字节 discriminator)
175    if data.len() < 8 {
176        return None;
177    }
178
179    // 常见 DEX 指令账户数远小于 64;栈上缓冲避免每笔 outer 一次 Vec 分配
180    const STACK_CAP: usize = 64;
181    if account_indices.len() <= STACK_CAP {
182        let mut stack = [Pubkey::default(); STACK_CAP];
183        let mut n = 0usize;
184        for &idx in account_indices {
185            let k = get_key(idx as usize)?;
186            stack[n] = read_pubkey_fast(k);
187            n += 1;
188        }
189        crate::instr::parse_instruction_unified(
190            data,
191            &stack[..n],
192            sig,
193            slot,
194            tx_idx,
195            block_us,
196            grpc_us,
197            filter,
198            program_id,
199        )
200    } else {
201        let accounts: Vec<Pubkey> = account_indices
202            .iter()
203            .map(|&idx| get_key(idx as usize).map(|k| read_pubkey_fast(k)))
204            .collect::<Option<_>>()?;
205        crate::instr::parse_instruction_unified(
206            data, &accounts, sig, slot, tx_idx, block_us, grpc_us, filter, program_id,
207        )
208    }
209}
210
211/// 解析单个 inner instruction
212///
213/// Inner instructions 使用 16 字节 discriminator(前8字节是event hash,后8字节是magic)
214#[inline(always)]
215fn parse_inner_instruction(
216    data: &[u8],
217    program_id: &Pubkey,
218    sig: Signature,
219    slot: u64,
220    tx_idx: u64,
221    block_us: Option<i64>,
222    grpc_us: i64,
223    filter: Option<&EventTypeFilter>,
224    is_created_buy: bool,
225) -> Option<DexEvent> {
226    // 检查数据长度(至少16字节 discriminator)
227    if data.len() < 16 {
228        return None;
229    }
230
231    let metadata = EventMetadata {
232        signature: sig,
233        slot,
234        tx_index: tx_idx,
235        block_time_us: block_us.unwrap_or(0),
236        grpc_recv_us: grpc_us,
237        recent_blockhash: None, // set later on merged events in parse_instructions_enhanced
238    };
239
240    // 提取 16 字节 discriminator
241    let mut discriminator = [0u8; 16];
242    discriminator.copy_from_slice(&data[..16]);
243    let inner_data = &data[16..];
244
245    use crate::instr::{all_inner, program_ids, pump_amm_inner, pump_inner, raydium_clmm_inner};
246
247    // 根据 program_id 路由到对应的 inner instruction 解析器
248    let event = if *program_id == program_ids::PUMPFUN_PROGRAM_ID {
249        if let Some(f) = filter {
250            if !f.includes_pumpfun() {
251                return None;
252            }
253        }
254        pump_inner::parse_pumpfun_inner_instruction(
255            &discriminator,
256            inner_data,
257            metadata,
258            is_created_buy,
259        )
260    } else if *program_id == program_ids::PUMPSWAP_PROGRAM_ID {
261        if let Some(f) = filter {
262            if !f.includes_pumpswap() {
263                return None;
264            }
265        }
266        pump_amm_inner::parse_pumpswap_inner_instruction(&discriminator, inner_data, metadata)
267    } else if *program_id == program_ids::PUMP_FEES_PROGRAM_ID {
268        if let Some(f) = filter {
269            if !f.includes_pump_fees() {
270                return None;
271            }
272        }
273        all_inner::pump_fees::parse(&discriminator, inner_data, metadata)
274    } else if *program_id == program_ids::RAYDIUM_CLMM_PROGRAM_ID {
275        if let Some(f) = filter {
276            if !f.includes_raydium_clmm() {
277                return None;
278            }
279        }
280        raydium_clmm_inner::parse_raydium_clmm_inner_instruction(
281            &discriminator,
282            inner_data,
283            metadata,
284        )
285    } else if *program_id == program_ids::RAYDIUM_CPMM_PROGRAM_ID {
286        if let Some(f) = filter {
287            if !f.includes_raydium_cpmm() {
288                return None;
289            }
290        }
291        all_inner::raydium_cpmm::parse(&discriminator, inner_data, metadata)
292    } else if *program_id == program_ids::RAYDIUM_AMM_V4_PROGRAM_ID {
293        if let Some(f) = filter {
294            if !f.includes_raydium_amm_v4() {
295                return None;
296            }
297        }
298        all_inner::raydium_amm::parse(&discriminator, inner_data, metadata)
299    } else if *program_id == program_ids::ORCA_WHIRLPOOL_PROGRAM_ID {
300        if let Some(f) = filter {
301            if !f.includes_orca_whirlpool() {
302                return None;
303            }
304        }
305        all_inner::orca::parse(&discriminator, inner_data, metadata)
306    } else if *program_id == program_ids::METEORA_POOLS_PROGRAM_ID {
307        if let Some(f) = filter {
308            if !f.includes_meteora_pools() {
309                return None;
310            }
311        }
312        all_inner::meteora_amm::parse(&discriminator, inner_data, metadata)
313    } else if *program_id == program_ids::METEORA_DAMM_V2_PROGRAM_ID {
314        if let Some(f) = filter {
315            if !f.includes_meteora_damm_v2() {
316                return None;
317            }
318        }
319        all_inner::meteora_damm::parse(&discriminator, inner_data, metadata)
320    } else if *program_id == program_ids::METEORA_DLMM_PROGRAM_ID {
321        if let Some(f) = filter {
322            if !f.includes_meteora_dlmm() {
323                return None;
324            }
325        }
326        all_inner::meteora_dlmm::parse(&discriminator, inner_data, metadata)
327    } else if *program_id == program_ids::BONK_PROGRAM_ID {
328        if let Some(f) = filter {
329            if !f.includes_raydium_launchpad() {
330                return None;
331            }
332        }
333        all_inner::bonk::parse(&discriminator, inner_data, metadata)
334    } else {
335        None
336    };
337
338    if filter.map(|f| event.as_ref().is_some_and(|e| f.should_include_dex_event(e))).unwrap_or(true)
339    {
340        event
341    } else {
342        None
343    }
344}
345
346/// 合并相关的 instruction 和 inner instruction 事件
347///
348/// 合并策略:
349/// 1. 同一个 outer_idx 的 instruction 和 inner instruction 可以合并
350/// 2. Inner instruction 在 outer instruction 之后出现(排序保证主指令在前)
351/// 3. 同一 outer 下若有多个 inner,依次链式合并进同一条事件,再输出
352/// 4. 合并后返回更完整的事件
353#[inline(always)]
354fn merge_instruction_events(events: Vec<(usize, Option<usize>, DexEvent)>) -> Vec<DexEvent> {
355    if events.is_empty() {
356        return Vec::new();
357    }
358
359    // 按 (outer_idx, inner_idx) 排序,确保顺序:同一 outer 下 **主指令在前、inner 在后**
360    // (`None` 若用 MAX 会把 outer 排到 inner 后面,导致无法 merge)
361    let mut events = events;
362    events.sort_by_key(|(outer, inner, _)| (*outer, inner.map_or(0, |i| i + 1)));
363
364    let mut result = Vec::with_capacity(events.len());
365    let mut pending_outer: Option<(usize, DexEvent)> = None;
366
367    for (outer_idx, inner_idx, event) in events {
368        match inner_idx {
369            None => {
370                // 这是一个 outer instruction
371                // 先处理之前的 pending_outer
372                if let Some((_, outer_event)) = pending_outer.take() {
373                    result.push(outer_event);
374                }
375                // 保存当前的 outer instruction,等待可能的 inner instruction
376                pending_outer = Some((outer_idx, event));
377            }
378            Some(_) => {
379                // 这是一个 inner instruction
380                if let Some((pending_outer_idx, mut outer_event)) = pending_outer.take() {
381                    if pending_outer_idx == outer_idx {
382                        // 合并进当前 outer(可多次:多段 inner 链式叠在同一条事件上)
383                        merge_events(&mut outer_event, event);
384                        pending_outer = Some((outer_idx, outer_event));
385                    } else {
386                        // 不匹配,分别保留
387                        result.push(outer_event);
388                        result.push(event);
389                    }
390                } else {
391                    // 没有 pending outer,直接添加 inner event
392                    result.push(event);
393                }
394            }
395        }
396    }
397
398    // 处理最后一个 pending_outer
399    if let Some((_, outer_event)) = pending_outer {
400        result.push(outer_event);
401    }
402
403    result
404}
405
406/// 检查是否需要解析 instructions(根据 filter)
407#[inline(always)]
408fn should_parse_instructions(filter: Option<&EventTypeFilter>) -> bool {
409    // 如果没有 filter,总是解析
410    let Some(filter) = filter else { return true };
411
412    // 如果 filter.include_only 为空,总是解析
413    if filter.include_only.is_none() {
414        return true;
415    }
416
417    // PumpFun:outer BUY/SELL carries instruction args while inner/log TradeEvent
418    // carries executed amounts. Parse both and merge them by order.
419    if filter.includes_pumpfun() {
420        return true;
421    }
422
423    if filter.includes_pump_fees() {
424        return true;
425    }
426
427    filter.includes_pumpswap()
428        || filter.includes_raydium_launchpad()
429        || filter.includes_raydium_cpmm()
430        || filter.includes_raydium_clmm()
431        || filter.includes_raydium_amm_v4()
432        || filter.includes_orca_whirlpool()
433        || filter.includes_meteora_pools()
434        || filter.includes_meteora_damm_v2()
435        || filter.includes_meteora_dlmm()
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441
442    #[test]
443    fn test_should_parse_instructions() {
444        // 无 filter - 应该解析
445        assert!(should_parse_instructions(None));
446
447        // 有 filter 但 include_only 为空 - 应该解析
448        let filter = EventTypeFilter { include_only: None, exclude_types: None };
449        assert!(should_parse_instructions(Some(&filter)));
450
451        // 包含需要 instruction 解析的事件类型
452        use crate::grpc::types::EventType;
453        let filter = EventTypeFilter {
454            include_only: Some(vec![EventType::PumpFunMigrate]),
455            exclude_types: None,
456        };
457        assert!(should_parse_instructions(Some(&filter)));
458
459        // PumpFun 订阅:需要 instruction+inner,避免仅日志时截断丢腿
460        let filter = EventTypeFilter {
461            include_only: Some(vec![EventType::PumpFunTrade]),
462            exclude_types: None,
463        };
464        assert!(should_parse_instructions(Some(&filter)));
465
466        for event_type in [
467            EventType::PumpSwapTrade,
468            EventType::PumpFeesUpdateFeeShares,
469            EventType::BonkTrade,
470            EventType::RaydiumCpmmSwap,
471            EventType::RaydiumClmmSwap,
472            EventType::RaydiumAmmV4Swap,
473            EventType::OrcaWhirlpoolSwap,
474            EventType::MeteoraPoolsSwap,
475            EventType::MeteoraDammV2Swap,
476            EventType::MeteoraDlmmSwap,
477        ] {
478            let filter = EventTypeFilter::include_only(vec![event_type]);
479            assert!(
480                should_parse_instructions(Some(&filter)),
481                "instruction parsing should be enabled for {event_type:?}"
482            );
483        }
484    }
485
486    #[test]
487    fn test_merge_instruction_events() {
488        use solana_sdk::signature::Signature;
489
490        let metadata = EventMetadata {
491            signature: Signature::default(),
492            slot: 100,
493            tx_index: 1,
494            block_time_us: 1000,
495            grpc_recv_us: 2000,
496            recent_blockhash: None,
497        };
498
499        // 模拟:outer instruction + inner instruction(应该合并)
500        let outer_event = DexEvent::PumpFunTrade(PumpFunTradeEvent {
501            metadata: metadata.clone(),
502            bonding_curve: Pubkey::new_unique(),
503            ..Default::default()
504        });
505
506        let inner_event = DexEvent::PumpFunTrade(PumpFunTradeEvent {
507            metadata: metadata.clone(),
508            sol_amount: 1000,
509            token_amount: 2000,
510            ..Default::default()
511        });
512
513        let events = vec![
514            (0, None, outer_event),    // outer instruction at index 0
515            (0, Some(0), inner_event), // inner instruction at index 0
516        ];
517
518        let result = merge_instruction_events(events);
519
520        // 应该合并为1个事件
521        assert_eq!(result.len(), 1);
522
523        // 验证合并结果包含两者的数据
524        if let DexEvent::PumpFunTrade(trade) = &result[0] {
525            assert_eq!(trade.sol_amount, 1000); // 来自 inner
526            assert_eq!(trade.token_amount, 2000); // 来自 inner
527            assert_ne!(trade.bonding_curve, Pubkey::default()); // 来自 outer
528        } else {
529            panic!("Expected PumpFunTrade event");
530        }
531    }
532
533    #[test]
534    fn test_merge_instruction_events_chains_multiple_inners_same_outer() {
535        use solana_sdk::signature::Signature;
536
537        let metadata = EventMetadata {
538            signature: Signature::default(),
539            slot: 100,
540            tx_index: 1,
541            block_time_us: 1000,
542            grpc_recv_us: 2000,
543            recent_blockhash: None,
544        };
545
546        let bc = Pubkey::new_unique();
547        let fee = Pubkey::new_unique();
548
549        let outer_event = DexEvent::PumpFunTrade(PumpFunTradeEvent {
550            metadata: metadata.clone(),
551            bonding_curve: bc,
552            ..Default::default()
553        });
554
555        let inner_trade = DexEvent::PumpFunTrade(PumpFunTradeEvent {
556            metadata: metadata.clone(),
557            sol_amount: 1000,
558            token_amount: 2000,
559            is_buy: true,
560            ..Default::default()
561        });
562
563        // 第二段 inner 仅有 fee_recipient,无成交量 —— 不应抹掉第一段金额
564        let inner_fee_only = DexEvent::PumpFunTrade(PumpFunTradeEvent {
565            metadata: metadata.clone(),
566            fee_recipient: fee,
567            ..Default::default()
568        });
569
570        let events =
571            vec![(0, None, outer_event), (0, Some(0), inner_trade), (0, Some(1), inner_fee_only)];
572
573        let result = merge_instruction_events(events);
574        assert_eq!(result.len(), 1);
575        if let DexEvent::PumpFunTrade(trade) = &result[0] {
576            assert_eq!(trade.bonding_curve, bc);
577            assert_eq!(trade.sol_amount, 1000);
578            assert_eq!(trade.token_amount, 2000);
579            assert_eq!(trade.fee_recipient, fee);
580        } else {
581            panic!("Expected PumpFunTrade event");
582        }
583    }
584}