Skip to main content

sol_parser_sdk/
rpc_parser.rs

1//! RPC Transaction Parser
2//!
3//! 提供独立的 RPC 交易解析功能,不依赖 gRPC streaming
4//! 可以用于测试验证和离线分析
5
6use crate::core::events::DexEvent;
7use crate::grpc::instruction_parser::parse_instructions_enhanced;
8use crate::grpc::types::EventTypeFilter;
9use crate::instr::read_pubkey_fast;
10use base64::{engine::general_purpose, Engine as _};
11use solana_client::rpc_client::RpcClient;
12use solana_client::rpc_config::RpcTransactionConfig;
13use solana_sdk::pubkey::Pubkey;
14use solana_sdk::signature::Signature;
15use solana_transaction_status::{
16    EncodedConfirmedTransactionWithStatusMeta, EncodedTransaction, UiTransactionEncoding,
17};
18use std::collections::HashMap;
19use std::str::FromStr;
20use yellowstone_grpc_proto::prelude::{
21    CompiledInstruction, InnerInstruction, InnerInstructions, Message, MessageAddressTableLookup,
22    MessageHeader, Transaction, TransactionStatusMeta,
23};
24
25/// Parse a transaction from RPC by signature
26///
27/// # Arguments
28/// * `rpc_client` - RPC client to fetch the transaction
29/// * `signature` - Transaction signature
30/// * `filter` - Optional event type filter
31///
32/// # Returns
33/// Vector of parsed DEX events
34///
35/// # Example
36/// ```no_run
37/// use solana_client::rpc_client::RpcClient;
38/// use solana_sdk::signature::Signature;
39/// use sol_parser_sdk::parse_transaction_from_rpc;
40/// use std::str::FromStr;
41///
42/// let client = RpcClient::new("https://api.mainnet-beta.solana.com".to_string());
43/// let sig = Signature::from_str("your-signature-here").unwrap();
44/// let events = parse_transaction_from_rpc(&client, &sig, None).unwrap();
45/// ```
46pub fn parse_transaction_from_rpc(
47    rpc_client: &RpcClient,
48    signature: &Signature,
49    filter: Option<&EventTypeFilter>,
50) -> Result<Vec<DexEvent>, ParseError> {
51    // Fetch transaction from RPC with V0 transaction support
52    let config = RpcTransactionConfig {
53        encoding: Some(UiTransactionEncoding::Base64),
54        commitment: None,
55        max_supported_transaction_version: Some(0),
56    };
57
58    let rpc_tx = rpc_client.get_transaction_with_config(signature, config).map_err(|e| {
59        let msg = e.to_string();
60        if msg.contains("invalid type: null") && msg.contains("EncodedConfirmedTransactionWithStatusMeta") {
61            ParseError::RpcError(format!(
62                "Transaction not found (RPC returned null). Common causes: 1) Transaction is too old and pruned (use an archive RPC). 2) Wrong network or invalid signature. Try SOLANA_RPC_URL with an archive endpoint (e.g. Helius, QuickNode) or a more recent tx. Original: {}",
63                msg
64            ))
65        } else {
66            ParseError::RpcError(msg)
67        }
68    })?;
69
70    parse_rpc_transaction(&rpc_tx, filter)
71}
72
73/// Parse a RPC transaction structure
74///
75/// # Arguments
76/// * `rpc_tx` - RPC transaction to parse
77/// * `filter` - Optional event type filter
78///
79/// # Returns
80/// Vector of parsed DEX events
81///
82/// # Example
83/// ```no_run
84/// use sol_parser_sdk::parse_rpc_transaction;
85///
86/// // Assuming you have an rpc_tx from RPC
87/// // let events = parse_rpc_transaction(&rpc_tx, None).unwrap();
88/// ```
89pub fn parse_rpc_transaction(
90    rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
91    filter: Option<&EventTypeFilter>,
92) -> Result<Vec<DexEvent>, ParseError> {
93    // Convert RPC format to gRPC format
94    let (grpc_meta, grpc_tx) = convert_rpc_to_grpc(rpc_tx)?;
95
96    // Extract metadata
97    let signature = extract_signature(rpc_tx)?;
98    let slot = rpc_tx.slot;
99    let block_time_us = rpc_tx.block_time.map(|t| t * 1_000_000);
100    let grpc_recv_us =
101        std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_micros()
102            as i64;
103
104    // Wrap grpc_tx in Option for reuse
105    let grpc_tx_opt = Some(grpc_tx);
106
107    let recent_blockhash = grpc_tx_opt.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
108        if m.recent_blockhash.is_empty() {
109            None
110        } else {
111            Some(m.recent_blockhash.clone())
112        }
113    });
114
115    let mut program_invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::new();
116
117    if let Some(ref tx) = grpc_tx_opt {
118        if let Some(ref msg) = tx.message {
119            let keys_len = msg.account_keys.len();
120            let writable_len = grpc_meta.loaded_writable_addresses.len();
121            let get_key = |i: usize| -> Option<&Vec<u8>> {
122                if i < keys_len {
123                    msg.account_keys.get(i)
124                } else if i < keys_len + writable_len {
125                    grpc_meta.loaded_writable_addresses.get(i - keys_len)
126                } else {
127                    grpc_meta.loaded_readonly_addresses.get(i - keys_len - writable_len)
128                }
129            };
130
131            for (i, ix) in msg.instructions.iter().enumerate() {
132                let pid = get_key(ix.program_id_index as usize)
133                    .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
134                program_invokes.entry(pid).or_default().push((i as i32, -1));
135            }
136
137            for inner in &grpc_meta.inner_instructions {
138                let outer_idx = inner.index as usize;
139                for (j, inner_ix) in inner.instructions.iter().enumerate() {
140                    let pid = get_key(inner_ix.program_id_index as usize)
141                        .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
142                    program_invokes.entry(pid).or_default().push((outer_idx as i32, j as i32));
143                }
144            }
145        }
146    }
147
148    // Parse instructions
149    let instr_events = parse_instructions_enhanced(
150        &grpc_meta,
151        &grpc_tx_opt,
152        signature,
153        slot,
154        0, // tx_idx
155        block_time_us,
156        grpc_recv_us,
157        filter,
158    );
159
160    // Parse logs (for protocols like PumpFun that emit events in logs)
161    let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
162    let is_created_buy = needs_pumpfun
163        && crate::logs::optimized_matcher::detect_pumpfun_create(&grpc_meta.log_messages);
164    let mut active_program_stack: Vec<Pubkey> = Vec::with_capacity(8);
165    let mut log_events = Vec::new();
166
167    for log in &grpc_meta.log_messages {
168        if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
169            if let Ok(pk) = Pubkey::from_str(pid) {
170                active_program_stack.truncate(depth.saturating_sub(1));
171                active_program_stack.push(pk);
172            }
173        }
174
175        if let Some(mut event) = crate::logs::parse_log_with_program_id(
176            log,
177            signature,
178            slot,
179            0, // tx_index
180            block_time_us,
181            grpc_recv_us,
182            filter,
183            is_created_buy,
184            recent_blockhash.as_deref(),
185            active_program_stack.last(),
186        ) {
187            // Fill account fields - use same function as gRPC parsing
188            crate::core::account_dispatcher::fill_accounts_with_owned_keys(
189                &mut event,
190                &grpc_meta,
191                &grpc_tx_opt,
192                &program_invokes,
193            );
194
195            // Fill additional data fields (e.g., PumpSwap is_pump_pool)
196            crate::core::common_filler::fill_data(
197                &mut event,
198                &grpc_meta,
199                &grpc_tx_opt,
200                &program_invokes,
201            );
202
203            log_events.push(event);
204        }
205
206        if let Some(pid) = crate::logs::optimized_matcher::parse_program_complete_info(log) {
207            if let Ok(pk) = Pubkey::from_str(pid) {
208                if let Some(pos) = active_program_stack.iter().rposition(|active| *active == pk) {
209                    active_program_stack.truncate(pos);
210                }
211            }
212        }
213    }
214
215    Ok(merge_log_and_instruction_events(log_events, instr_events))
216}
217
218fn merge_log_and_instruction_events(
219    log_events: Vec<DexEvent>,
220    instr_events: Vec<DexEvent>,
221) -> Vec<DexEvent> {
222    crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events)
223}
224
225/// Parse error types
226#[derive(Debug)]
227pub enum ParseError {
228    RpcError(String),
229    ConversionError(String),
230    MissingField(String),
231}
232
233impl std::fmt::Display for ParseError {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        match self {
236            ParseError::RpcError(msg) => write!(f, "RPC error: {}", msg),
237            ParseError::ConversionError(msg) => write!(f, "Conversion error: {}", msg),
238            ParseError::MissingField(msg) => write!(f, "Missing field: {}", msg),
239        }
240    }
241}
242
243impl std::error::Error for ParseError {}
244
245// ============================================================================
246// Internal conversion functions
247// ============================================================================
248
249fn extract_signature(
250    rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
251) -> Result<Signature, ParseError> {
252    let ui_tx = &rpc_tx.transaction.transaction;
253
254    match ui_tx {
255        EncodedTransaction::Binary(data, _encoding) => {
256            let bytes = general_purpose::STANDARD.decode(data).map_err(|e| {
257                ParseError::ConversionError(format!("Failed to decode base64: {}", e))
258            })?;
259
260            let versioned_tx: solana_sdk::transaction::VersionedTransaction =
261                bincode::deserialize(&bytes).map_err(|e| {
262                    ParseError::ConversionError(format!("Failed to deserialize transaction: {}", e))
263                })?;
264
265            Ok(versioned_tx.signatures[0])
266        }
267        _ => Err(ParseError::ConversionError("Unsupported transaction encoding".to_string())),
268    }
269}
270
271pub fn convert_rpc_to_grpc(
272    rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
273) -> Result<(TransactionStatusMeta, Transaction), ParseError> {
274    let rpc_meta = rpc_tx
275        .transaction
276        .meta
277        .as_ref()
278        .ok_or_else(|| ParseError::MissingField("meta".to_string()))?;
279
280    // Convert meta
281    let mut grpc_meta = TransactionStatusMeta {
282        err: None,
283        fee: rpc_meta.fee,
284        pre_balances: rpc_meta.pre_balances.clone(),
285        post_balances: rpc_meta.post_balances.clone(),
286        inner_instructions: Vec::new(),
287        log_messages: {
288            let opt: Option<Vec<String>> = rpc_meta.log_messages.clone().into();
289            opt.unwrap_or_default()
290        },
291        pre_token_balances: Vec::new(),
292        post_token_balances: Vec::new(),
293        rewards: Vec::new(),
294        loaded_writable_addresses: {
295            let loaded_opt: Option<solana_transaction_status::UiLoadedAddresses> =
296                rpc_meta.loaded_addresses.clone().into();
297            loaded_opt
298                .map(|addrs| {
299                    addrs
300                        .writable
301                        .iter()
302                        .map(|pk_str| {
303                            use std::str::FromStr;
304                            solana_sdk::pubkey::Pubkey::from_str(pk_str)
305                                .unwrap()
306                                .to_bytes()
307                                .to_vec()
308                        })
309                        .collect()
310                })
311                .unwrap_or_default()
312        },
313        loaded_readonly_addresses: {
314            let loaded_opt: Option<solana_transaction_status::UiLoadedAddresses> =
315                rpc_meta.loaded_addresses.clone().into();
316            loaded_opt
317                .map(|addrs| {
318                    addrs
319                        .readonly
320                        .iter()
321                        .map(|pk_str| {
322                            use std::str::FromStr;
323                            solana_sdk::pubkey::Pubkey::from_str(pk_str)
324                                .unwrap()
325                                .to_bytes()
326                                .to_vec()
327                        })
328                        .collect()
329                })
330                .unwrap_or_default()
331        },
332        return_data: None,
333        compute_units_consumed: rpc_meta.compute_units_consumed.clone().into(),
334
335        inner_instructions_none: {
336            let opt: Option<Vec<_>> = rpc_meta.inner_instructions.clone().into();
337            opt.is_none()
338        },
339        log_messages_none: {
340            let opt: Option<Vec<String>> = rpc_meta.log_messages.clone().into();
341            opt.is_none()
342        },
343        return_data_none: {
344            let opt: Option<solana_transaction_status::UiTransactionReturnData> =
345                rpc_meta.return_data.clone().into();
346            opt.is_none()
347        },
348        cost_units: rpc_meta.compute_units_consumed.clone().into(),
349    };
350
351    // Convert inner instructions
352    let inner_instructions_opt: Option<Vec<_>> = rpc_meta.inner_instructions.clone().into();
353    if let Some(ref inner_instructions) = inner_instructions_opt {
354        for inner in inner_instructions {
355            let mut grpc_inner =
356                InnerInstructions { index: inner.index as u32, instructions: Vec::new() };
357
358            for ix in &inner.instructions {
359                if let solana_transaction_status::UiInstruction::Compiled(compiled) = ix {
360                    // Decode base58 data
361                    let data = bs58::decode(&compiled.data).into_vec().map_err(|e| {
362                        ParseError::ConversionError(format!(
363                            "Failed to decode instruction data: {}",
364                            e
365                        ))
366                    })?;
367
368                    grpc_inner.instructions.push(InnerInstruction {
369                        program_id_index: compiled.program_id_index as u32,
370                        accounts: compiled.accounts.clone(),
371                        data,
372                        stack_height: compiled.stack_height,
373                    });
374                }
375            }
376
377            grpc_meta.inner_instructions.push(grpc_inner);
378        }
379    }
380
381    // Convert transaction
382    let ui_tx = &rpc_tx.transaction.transaction;
383
384    let (message, signatures) = match ui_tx {
385        EncodedTransaction::Binary(data, _encoding) => {
386            // Decode base64
387            let bytes = general_purpose::STANDARD.decode(data).map_err(|e| {
388                ParseError::ConversionError(format!("Failed to decode base64: {}", e))
389            })?;
390
391            // Parse as versioned transaction
392            let versioned_tx: solana_sdk::transaction::VersionedTransaction =
393                bincode::deserialize(&bytes).map_err(|e| {
394                    ParseError::ConversionError(format!("Failed to deserialize transaction: {}", e))
395                })?;
396
397            let sigs: Vec<Vec<u8>> =
398                versioned_tx.signatures.iter().map(|s| s.as_ref().to_vec()).collect();
399
400            let message = match versioned_tx.message {
401                solana_sdk::message::VersionedMessage::Legacy(legacy_msg) => {
402                    convert_legacy_message(&legacy_msg)?
403                }
404                solana_sdk::message::VersionedMessage::V0(v0_msg) => convert_v0_message(&v0_msg)?,
405            };
406
407            (message, sigs)
408        }
409        EncodedTransaction::Json(_) => {
410            return Err(ParseError::ConversionError(
411                "JSON encoded transactions not supported yet".to_string(),
412            ));
413        }
414        _ => {
415            return Err(ParseError::ConversionError(
416                "Unsupported transaction encoding".to_string(),
417            ));
418        }
419    };
420
421    let grpc_tx = Transaction { signatures, message: Some(message) };
422
423    Ok((grpc_meta, grpc_tx))
424}
425
426fn convert_legacy_message(
427    msg: &solana_sdk::message::legacy::Message,
428) -> Result<Message, ParseError> {
429    let account_keys: Vec<Vec<u8>> =
430        msg.account_keys.iter().map(|k| k.to_bytes().to_vec()).collect();
431
432    let instructions: Vec<CompiledInstruction> = msg
433        .instructions
434        .iter()
435        .map(|ix| CompiledInstruction {
436            program_id_index: ix.program_id_index as u32,
437            accounts: ix.accounts.clone(),
438            data: ix.data.clone(),
439        })
440        .collect();
441
442    Ok(Message {
443        header: Some(MessageHeader {
444            num_required_signatures: msg.header.num_required_signatures as u32,
445            num_readonly_signed_accounts: msg.header.num_readonly_signed_accounts as u32,
446            num_readonly_unsigned_accounts: msg.header.num_readonly_unsigned_accounts as u32,
447        }),
448        account_keys,
449        recent_blockhash: msg.recent_blockhash.to_bytes().to_vec(),
450        instructions,
451        versioned: false,
452        address_table_lookups: Vec::new(),
453    })
454}
455
456fn convert_v0_message(msg: &solana_sdk::message::v0::Message) -> Result<Message, ParseError> {
457    let account_keys: Vec<Vec<u8>> =
458        msg.account_keys.iter().map(|k| k.to_bytes().to_vec()).collect();
459
460    let instructions: Vec<CompiledInstruction> = msg
461        .instructions
462        .iter()
463        .map(|ix| CompiledInstruction {
464            program_id_index: ix.program_id_index as u32,
465            accounts: ix.accounts.clone(),
466            data: ix.data.clone(),
467        })
468        .collect();
469
470    Ok(Message {
471        header: Some(MessageHeader {
472            num_required_signatures: msg.header.num_required_signatures as u32,
473            num_readonly_signed_accounts: msg.header.num_readonly_signed_accounts as u32,
474            num_readonly_unsigned_accounts: msg.header.num_readonly_unsigned_accounts as u32,
475        }),
476        account_keys,
477        recent_blockhash: msg.recent_blockhash.to_bytes().to_vec(),
478        instructions,
479        versioned: true,
480        address_table_lookups: msg
481            .address_table_lookups
482            .iter()
483            .map(|lookup| MessageAddressTableLookup {
484                account_key: lookup.account_key.to_bytes().to_vec(),
485                writable_indexes: lookup.writable_indexes.clone(),
486                readonly_indexes: lookup.readonly_indexes.clone(),
487            })
488            .collect(),
489    })
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use crate::core::events::{DexEvent, EventMetadata, PumpSwapCreatePoolEvent};
496    use solana_sdk::{pubkey::Pubkey, signature::Signature};
497
498    fn dummy_meta() -> EventMetadata {
499        EventMetadata {
500            signature: Signature::default(),
501            slot: 1,
502            tx_index: 0,
503            block_time_us: 0,
504            grpc_recv_us: 0,
505            recent_blockhash: None,
506        }
507    }
508
509    #[test]
510    fn rpc_merge_keeps_instruction_cashback_for_log_only_pumpswap_create_pool() {
511        let pool = Pubkey::new_unique();
512        let base_mint = Pubkey::new_unique();
513        let quote_mint = Pubkey::new_unique();
514
515        let log_create = PumpSwapCreatePoolEvent {
516            metadata: dummy_meta(),
517            pool,
518            base_mint,
519            quote_mint,
520            is_cashback_coin: false,
521            ..Default::default()
522        };
523        let ix_create = PumpSwapCreatePoolEvent {
524            metadata: dummy_meta(),
525            pool,
526            base_mint,
527            quote_mint,
528            is_cashback_coin: true,
529            ..Default::default()
530        };
531
532        let merged = merge_log_and_instruction_events(
533            vec![DexEvent::PumpSwapCreatePool(log_create)],
534            vec![DexEvent::PumpSwapCreatePool(ix_create)],
535        );
536
537        assert_eq!(merged.len(), 1);
538        match &merged[0] {
539            DexEvent::PumpSwapCreatePool(e) => assert!(e.is_cashback_coin),
540            other => panic!("expected PumpSwapCreatePool, got {other:?}"),
541        }
542    }
543}