1use crate::core::events::DexEvent;
7use crate::grpc::instruction_parser::parse_instructions_enhanced;
8use crate::grpc::types::EventTypeFilter;
9use crate::instr::read_pubkey_fast;
10use base64::{engine::general_purpose, Engine as _};
11use solana_client::rpc_client::RpcClient;
12use solana_client::rpc_config::RpcTransactionConfig;
13use solana_sdk::pubkey::Pubkey;
14use solana_sdk::signature::Signature;
15use solana_transaction_status::{
16 EncodedConfirmedTransactionWithStatusMeta, EncodedTransaction, UiTransactionEncoding,
17};
18use std::collections::HashMap;
19use std::str::FromStr;
20use yellowstone_grpc_proto::prelude::{
21 CompiledInstruction, InnerInstruction, InnerInstructions, Message, MessageAddressTableLookup,
22 MessageHeader, Transaction, TransactionStatusMeta,
23};
24
25pub fn parse_transaction_from_rpc(
47 rpc_client: &RpcClient,
48 signature: &Signature,
49 filter: Option<&EventTypeFilter>,
50) -> Result<Vec<DexEvent>, ParseError> {
51 let config = RpcTransactionConfig {
53 encoding: Some(UiTransactionEncoding::Base64),
54 commitment: None,
55 max_supported_transaction_version: Some(0),
56 };
57
58 let rpc_tx = rpc_client.get_transaction_with_config(signature, config).map_err(|e| {
59 let msg = e.to_string();
60 if msg.contains("invalid type: null") && msg.contains("EncodedConfirmedTransactionWithStatusMeta") {
61 ParseError::RpcError(format!(
62 "Transaction not found (RPC returned null). Common causes: 1) Transaction is too old and pruned (use an archive RPC). 2) Wrong network or invalid signature. Try SOLANA_RPC_URL with an archive endpoint (e.g. Helius, QuickNode) or a more recent tx. Original: {}",
63 msg
64 ))
65 } else {
66 ParseError::RpcError(msg)
67 }
68 })?;
69
70 parse_rpc_transaction(&rpc_tx, filter)
71}
72
73pub fn parse_rpc_transaction(
90 rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
91 filter: Option<&EventTypeFilter>,
92) -> Result<Vec<DexEvent>, ParseError> {
93 let (grpc_meta, grpc_tx) = convert_rpc_to_grpc(rpc_tx)?;
95
96 let signature = extract_signature(rpc_tx)?;
98 let slot = rpc_tx.slot;
99 let block_time_us = rpc_tx.block_time.map(|t| t * 1_000_000);
100 let grpc_recv_us =
101 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_micros()
102 as i64;
103
104 let grpc_tx_opt = Some(grpc_tx);
106
107 let recent_blockhash = grpc_tx_opt.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
108 if m.recent_blockhash.is_empty() {
109 None
110 } else {
111 Some(m.recent_blockhash.clone())
112 }
113 });
114
115 let mut program_invokes: HashMap<Pubkey, Vec<(i32, i32)>> = HashMap::new();
116
117 if let Some(ref tx) = grpc_tx_opt {
118 if let Some(ref msg) = tx.message {
119 let keys_len = msg.account_keys.len();
120 let writable_len = grpc_meta.loaded_writable_addresses.len();
121 let get_key = |i: usize| -> Option<&Vec<u8>> {
122 if i < keys_len {
123 msg.account_keys.get(i)
124 } else if i < keys_len + writable_len {
125 grpc_meta.loaded_writable_addresses.get(i - keys_len)
126 } else {
127 grpc_meta.loaded_readonly_addresses.get(i - keys_len - writable_len)
128 }
129 };
130
131 for (i, ix) in msg.instructions.iter().enumerate() {
132 let pid = get_key(ix.program_id_index as usize)
133 .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
134 program_invokes.entry(pid).or_default().push((i as i32, -1));
135 }
136
137 for inner in &grpc_meta.inner_instructions {
138 let outer_idx = inner.index as usize;
139 for (j, inner_ix) in inner.instructions.iter().enumerate() {
140 let pid = get_key(inner_ix.program_id_index as usize)
141 .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
142 program_invokes.entry(pid).or_default().push((outer_idx as i32, j as i32));
143 }
144 }
145 }
146 }
147
148 let instr_events = parse_instructions_enhanced(
150 &grpc_meta,
151 &grpc_tx_opt,
152 signature,
153 slot,
154 0, block_time_us,
156 grpc_recv_us,
157 filter,
158 );
159
160 let needs_pumpfun = filter.map(|f| f.includes_pumpfun()).unwrap_or(true);
162 let is_created_buy = needs_pumpfun
163 && crate::logs::optimized_matcher::detect_pumpfun_create(&grpc_meta.log_messages);
164 let mut active_program_stack: Vec<Pubkey> = Vec::with_capacity(8);
165 let mut log_events = Vec::new();
166
167 for log in &grpc_meta.log_messages {
168 if let Some((pid, depth)) = crate::logs::optimized_matcher::parse_invoke_info(log) {
169 if let Ok(pk) = Pubkey::from_str(pid) {
170 active_program_stack.truncate(depth.saturating_sub(1));
171 active_program_stack.push(pk);
172 }
173 }
174
175 if let Some(mut event) = crate::logs::parse_log_with_program_id(
176 log,
177 signature,
178 slot,
179 0, block_time_us,
181 grpc_recv_us,
182 filter,
183 is_created_buy,
184 recent_blockhash.as_deref(),
185 active_program_stack.last(),
186 ) {
187 crate::core::account_dispatcher::fill_accounts_with_owned_keys(
189 &mut event,
190 &grpc_meta,
191 &grpc_tx_opt,
192 &program_invokes,
193 );
194
195 crate::core::common_filler::fill_data(
197 &mut event,
198 &grpc_meta,
199 &grpc_tx_opt,
200 &program_invokes,
201 );
202
203 log_events.push(event);
204 }
205
206 if let Some(pid) = crate::logs::optimized_matcher::parse_program_complete_info(log) {
207 if let Ok(pk) = Pubkey::from_str(pid) {
208 if let Some(pos) = active_program_stack.iter().rposition(|active| *active == pk) {
209 active_program_stack.truncate(pos);
210 }
211 }
212 }
213 }
214
215 Ok(merge_log_and_instruction_events(log_events, instr_events))
216}
217
218fn merge_log_and_instruction_events(
219 log_events: Vec<DexEvent>,
220 instr_events: Vec<DexEvent>,
221) -> Vec<DexEvent> {
222 crate::grpc::log_instr_dedup::dedupe_log_instruction_events(log_events, instr_events)
223}
224
225#[derive(Debug)]
227pub enum ParseError {
228 RpcError(String),
229 ConversionError(String),
230 MissingField(String),
231}
232
233impl std::fmt::Display for ParseError {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 match self {
236 ParseError::RpcError(msg) => write!(f, "RPC error: {}", msg),
237 ParseError::ConversionError(msg) => write!(f, "Conversion error: {}", msg),
238 ParseError::MissingField(msg) => write!(f, "Missing field: {}", msg),
239 }
240 }
241}
242
243impl std::error::Error for ParseError {}
244
245fn extract_signature(
250 rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
251) -> Result<Signature, ParseError> {
252 let ui_tx = &rpc_tx.transaction.transaction;
253
254 match ui_tx {
255 EncodedTransaction::Binary(data, _encoding) => {
256 let bytes = general_purpose::STANDARD.decode(data).map_err(|e| {
257 ParseError::ConversionError(format!("Failed to decode base64: {}", e))
258 })?;
259
260 let versioned_tx: solana_sdk::transaction::VersionedTransaction =
261 bincode::deserialize(&bytes).map_err(|e| {
262 ParseError::ConversionError(format!("Failed to deserialize transaction: {}", e))
263 })?;
264
265 Ok(versioned_tx.signatures[0])
266 }
267 _ => Err(ParseError::ConversionError("Unsupported transaction encoding".to_string())),
268 }
269}
270
271pub fn convert_rpc_to_grpc(
272 rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
273) -> Result<(TransactionStatusMeta, Transaction), ParseError> {
274 let rpc_meta = rpc_tx
275 .transaction
276 .meta
277 .as_ref()
278 .ok_or_else(|| ParseError::MissingField("meta".to_string()))?;
279
280 let mut grpc_meta = TransactionStatusMeta {
282 err: None,
283 fee: rpc_meta.fee,
284 pre_balances: rpc_meta.pre_balances.clone(),
285 post_balances: rpc_meta.post_balances.clone(),
286 inner_instructions: Vec::new(),
287 log_messages: {
288 let opt: Option<Vec<String>> = rpc_meta.log_messages.clone().into();
289 opt.unwrap_or_default()
290 },
291 pre_token_balances: Vec::new(),
292 post_token_balances: Vec::new(),
293 rewards: Vec::new(),
294 loaded_writable_addresses: {
295 let loaded_opt: Option<solana_transaction_status::UiLoadedAddresses> =
296 rpc_meta.loaded_addresses.clone().into();
297 loaded_opt
298 .map(|addrs| {
299 addrs
300 .writable
301 .iter()
302 .map(|pk_str| {
303 use std::str::FromStr;
304 solana_sdk::pubkey::Pubkey::from_str(pk_str)
305 .unwrap()
306 .to_bytes()
307 .to_vec()
308 })
309 .collect()
310 })
311 .unwrap_or_default()
312 },
313 loaded_readonly_addresses: {
314 let loaded_opt: Option<solana_transaction_status::UiLoadedAddresses> =
315 rpc_meta.loaded_addresses.clone().into();
316 loaded_opt
317 .map(|addrs| {
318 addrs
319 .readonly
320 .iter()
321 .map(|pk_str| {
322 use std::str::FromStr;
323 solana_sdk::pubkey::Pubkey::from_str(pk_str)
324 .unwrap()
325 .to_bytes()
326 .to_vec()
327 })
328 .collect()
329 })
330 .unwrap_or_default()
331 },
332 return_data: None,
333 compute_units_consumed: rpc_meta.compute_units_consumed.clone().into(),
334
335 inner_instructions_none: {
336 let opt: Option<Vec<_>> = rpc_meta.inner_instructions.clone().into();
337 opt.is_none()
338 },
339 log_messages_none: {
340 let opt: Option<Vec<String>> = rpc_meta.log_messages.clone().into();
341 opt.is_none()
342 },
343 return_data_none: {
344 let opt: Option<solana_transaction_status::UiTransactionReturnData> =
345 rpc_meta.return_data.clone().into();
346 opt.is_none()
347 },
348 cost_units: rpc_meta.compute_units_consumed.clone().into(),
349 };
350
351 let inner_instructions_opt: Option<Vec<_>> = rpc_meta.inner_instructions.clone().into();
353 if let Some(ref inner_instructions) = inner_instructions_opt {
354 for inner in inner_instructions {
355 let mut grpc_inner =
356 InnerInstructions { index: inner.index as u32, instructions: Vec::new() };
357
358 for ix in &inner.instructions {
359 if let solana_transaction_status::UiInstruction::Compiled(compiled) = ix {
360 let data = bs58::decode(&compiled.data).into_vec().map_err(|e| {
362 ParseError::ConversionError(format!(
363 "Failed to decode instruction data: {}",
364 e
365 ))
366 })?;
367
368 grpc_inner.instructions.push(InnerInstruction {
369 program_id_index: compiled.program_id_index as u32,
370 accounts: compiled.accounts.clone(),
371 data,
372 stack_height: compiled.stack_height,
373 });
374 }
375 }
376
377 grpc_meta.inner_instructions.push(grpc_inner);
378 }
379 }
380
381 let ui_tx = &rpc_tx.transaction.transaction;
383
384 let (message, signatures) = match ui_tx {
385 EncodedTransaction::Binary(data, _encoding) => {
386 let bytes = general_purpose::STANDARD.decode(data).map_err(|e| {
388 ParseError::ConversionError(format!("Failed to decode base64: {}", e))
389 })?;
390
391 let versioned_tx: solana_sdk::transaction::VersionedTransaction =
393 bincode::deserialize(&bytes).map_err(|e| {
394 ParseError::ConversionError(format!("Failed to deserialize transaction: {}", e))
395 })?;
396
397 let sigs: Vec<Vec<u8>> =
398 versioned_tx.signatures.iter().map(|s| s.as_ref().to_vec()).collect();
399
400 let message = match versioned_tx.message {
401 solana_sdk::message::VersionedMessage::Legacy(legacy_msg) => {
402 convert_legacy_message(&legacy_msg)?
403 }
404 solana_sdk::message::VersionedMessage::V0(v0_msg) => convert_v0_message(&v0_msg)?,
405 };
406
407 (message, sigs)
408 }
409 EncodedTransaction::Json(_) => {
410 return Err(ParseError::ConversionError(
411 "JSON encoded transactions not supported yet".to_string(),
412 ));
413 }
414 _ => {
415 return Err(ParseError::ConversionError(
416 "Unsupported transaction encoding".to_string(),
417 ));
418 }
419 };
420
421 let grpc_tx = Transaction { signatures, message: Some(message) };
422
423 Ok((grpc_meta, grpc_tx))
424}
425
426fn convert_legacy_message(
427 msg: &solana_sdk::message::legacy::Message,
428) -> Result<Message, ParseError> {
429 let account_keys: Vec<Vec<u8>> =
430 msg.account_keys.iter().map(|k| k.to_bytes().to_vec()).collect();
431
432 let instructions: Vec<CompiledInstruction> = msg
433 .instructions
434 .iter()
435 .map(|ix| CompiledInstruction {
436 program_id_index: ix.program_id_index as u32,
437 accounts: ix.accounts.clone(),
438 data: ix.data.clone(),
439 })
440 .collect();
441
442 Ok(Message {
443 header: Some(MessageHeader {
444 num_required_signatures: msg.header.num_required_signatures as u32,
445 num_readonly_signed_accounts: msg.header.num_readonly_signed_accounts as u32,
446 num_readonly_unsigned_accounts: msg.header.num_readonly_unsigned_accounts as u32,
447 }),
448 account_keys,
449 recent_blockhash: msg.recent_blockhash.to_bytes().to_vec(),
450 instructions,
451 versioned: false,
452 address_table_lookups: Vec::new(),
453 })
454}
455
456fn convert_v0_message(msg: &solana_sdk::message::v0::Message) -> Result<Message, ParseError> {
457 let account_keys: Vec<Vec<u8>> =
458 msg.account_keys.iter().map(|k| k.to_bytes().to_vec()).collect();
459
460 let instructions: Vec<CompiledInstruction> = msg
461 .instructions
462 .iter()
463 .map(|ix| CompiledInstruction {
464 program_id_index: ix.program_id_index as u32,
465 accounts: ix.accounts.clone(),
466 data: ix.data.clone(),
467 })
468 .collect();
469
470 Ok(Message {
471 header: Some(MessageHeader {
472 num_required_signatures: msg.header.num_required_signatures as u32,
473 num_readonly_signed_accounts: msg.header.num_readonly_signed_accounts as u32,
474 num_readonly_unsigned_accounts: msg.header.num_readonly_unsigned_accounts as u32,
475 }),
476 account_keys,
477 recent_blockhash: msg.recent_blockhash.to_bytes().to_vec(),
478 instructions,
479 versioned: true,
480 address_table_lookups: msg
481 .address_table_lookups
482 .iter()
483 .map(|lookup| MessageAddressTableLookup {
484 account_key: lookup.account_key.to_bytes().to_vec(),
485 writable_indexes: lookup.writable_indexes.clone(),
486 readonly_indexes: lookup.readonly_indexes.clone(),
487 })
488 .collect(),
489 })
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use crate::core::events::{DexEvent, EventMetadata, PumpSwapCreatePoolEvent};
496 use solana_sdk::{pubkey::Pubkey, signature::Signature};
497
498 fn dummy_meta() -> EventMetadata {
499 EventMetadata {
500 signature: Signature::default(),
501 slot: 1,
502 tx_index: 0,
503 block_time_us: 0,
504 grpc_recv_us: 0,
505 recent_blockhash: None,
506 }
507 }
508
509 #[test]
510 fn rpc_merge_keeps_instruction_cashback_for_log_only_pumpswap_create_pool() {
511 let pool = Pubkey::new_unique();
512 let base_mint = Pubkey::new_unique();
513 let quote_mint = Pubkey::new_unique();
514
515 let log_create = PumpSwapCreatePoolEvent {
516 metadata: dummy_meta(),
517 pool,
518 base_mint,
519 quote_mint,
520 is_cashback_coin: false,
521 ..Default::default()
522 };
523 let ix_create = PumpSwapCreatePoolEvent {
524 metadata: dummy_meta(),
525 pool,
526 base_mint,
527 quote_mint,
528 is_cashback_coin: true,
529 ..Default::default()
530 };
531
532 let merged = merge_log_and_instruction_events(
533 vec![DexEvent::PumpSwapCreatePool(log_create)],
534 vec![DexEvent::PumpSwapCreatePool(ix_create)],
535 );
536
537 assert_eq!(merged.len(), 1);
538 match &merged[0] {
539 DexEvent::PumpSwapCreatePool(e) => assert!(e.is_cashback_coin),
540 other => panic!("expected PumpSwapCreatePool, got {other:?}"),
541 }
542 }
543}