1use crate::core::events::DexEvent;
7use crate::grpc::instruction_parser::parse_instructions_enhanced;
8use crate::grpc::types::EventTypeFilter;
9use crate::instr::read_pubkey_fast;
10use base64::{Engine as _, engine::general_purpose};
11use solana_client::rpc_client::RpcClient;
12use solana_client::rpc_config::RpcTransactionConfig;
13use solana_sdk::signature::Signature;
14use solana_sdk::pubkey::Pubkey;
15use solana_transaction_status::{
16 EncodedConfirmedTransactionWithStatusMeta, EncodedTransaction, UiTransactionEncoding,
17};
18use std::collections::HashMap;
19use yellowstone_grpc_proto::prelude::{
20 CompiledInstruction, InnerInstruction, InnerInstructions, Message, MessageAddressTableLookup, MessageHeader,
21 Transaction, TransactionStatusMeta,
22};
23
24pub fn parse_transaction_from_rpc(
46 rpc_client: &RpcClient,
47 signature: &Signature,
48 filter: Option<&EventTypeFilter>,
49) -> Result<Vec<DexEvent>, ParseError> {
50 let config = RpcTransactionConfig {
52 encoding: Some(UiTransactionEncoding::Base64),
53 commitment: None,
54 max_supported_transaction_version: Some(0),
55 };
56
57 let rpc_tx = rpc_client.get_transaction_with_config(signature, config).map_err(|e| {
58 let msg = e.to_string();
59 if msg.contains("invalid type: null") && msg.contains("EncodedConfirmedTransactionWithStatusMeta") {
60 ParseError::RpcError(format!(
61 "Transaction not found (RPC returned null). Common causes: 1) Transaction is too old and pruned (use an archive RPC). 2) Wrong network or invalid signature. Try SOLANA_RPC_URL with an archive endpoint (e.g. Helius, QuickNode) or a more recent tx. Original: {}",
62 msg
63 ))
64 } else {
65 ParseError::RpcError(msg)
66 }
67 })?;
68
69 parse_rpc_transaction(&rpc_tx, filter)
70}
71
72pub fn parse_rpc_transaction(
89 rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
90 filter: Option<&EventTypeFilter>,
91) -> Result<Vec<DexEvent>, ParseError> {
92 let (grpc_meta, grpc_tx) = convert_rpc_to_grpc(rpc_tx)?;
94
95 let signature = extract_signature(rpc_tx)?;
97 let slot = rpc_tx.slot;
98 let block_time_us = rpc_tx.block_time.map(|t| t * 1_000_000);
99 let grpc_recv_us = std::time::SystemTime::now()
100 .duration_since(std::time::UNIX_EPOCH)
101 .unwrap()
102 .as_micros() as i64;
103
104 let grpc_tx_opt = Some(grpc_tx);
106
107 let recent_blockhash = grpc_tx_opt
108 .as_ref()
109 .and_then(|t| t.message.as_ref())
110 .and_then(|m| {
111 if m.recent_blockhash.is_empty() {
112 None
113 } else {
114 Some(m.recent_blockhash.clone())
115 }
116 });
117
118 let mut program_invokes: HashMap<&str, Vec<(i32, i32)>> = HashMap::new();
121
122 if let Some(ref tx) = grpc_tx_opt {
123 if let Some(ref msg) = tx.message {
124 let keys_len = msg.account_keys.len();
126 let writable_len = grpc_meta.loaded_writable_addresses.len();
127 let get_key = |i: usize| -> Option<&Vec<u8>> {
128 if i < keys_len {
129 msg.account_keys.get(i)
130 } else if i < keys_len + writable_len {
131 grpc_meta.loaded_writable_addresses.get(i - keys_len)
132 } else {
133 grpc_meta.loaded_readonly_addresses.get(i - keys_len - writable_len)
134 }
135 };
136
137 for (i, ix) in msg.instructions.iter().enumerate() {
139 let pid = get_key(ix.program_id_index as usize)
140 .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
141 let pid_str = pid.to_string();
142 let pid_static: &'static str = pid_str.leak();
143 program_invokes.entry(pid_static).or_default().push((i as i32, -1));
144 }
145
146 for inner in &grpc_meta.inner_instructions {
148 let outer_idx = inner.index as usize;
149 for (j, inner_ix) in inner.instructions.iter().enumerate() {
150 let pid = get_key(inner_ix.program_id_index as usize)
151 .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
152 let pid_str = pid.to_string();
153 let pid_static: &'static str = pid_str.leak();
154 program_invokes.entry(pid_static).or_default().push((outer_idx as i32, j as i32));
155 }
156 }
157 }
158 }
159
160 let mut events = parse_instructions_enhanced(
162 &grpc_meta,
163 &grpc_tx_opt,
164 signature,
165 slot,
166 0, block_time_us,
168 grpc_recv_us,
169 filter,
170 );
171
172 let mut is_created_buy = false;
174
175 for log in &grpc_meta.log_messages {
176 if let Some(mut event) = crate::logs::parse_log(
177 log,
178 signature,
179 slot,
180 0, block_time_us,
182 grpc_recv_us,
183 filter,
184 is_created_buy,
185 recent_blockhash.as_deref(),
186 ) {
187 if matches!(event, DexEvent::PumpFunCreate(_) | DexEvent::PumpFunCreateV2(_)) {
189 is_created_buy = true;
190 }
191
192 crate::core::account_dispatcher::fill_accounts_from_transaction_data(
194 &mut event,
195 &grpc_meta,
196 &grpc_tx_opt,
197 &program_invokes,
198 );
199
200 crate::core::common_filler::fill_data(
202 &mut event,
203 &grpc_meta,
204 &grpc_tx_opt,
205 &program_invokes,
206 );
207
208 events.push(event);
209 }
210 }
211
212 Ok(events)
213}
214
215#[derive(Debug)]
217pub enum ParseError {
218 RpcError(String),
219 ConversionError(String),
220 MissingField(String),
221}
222
223impl std::fmt::Display for ParseError {
224 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225 match self {
226 ParseError::RpcError(msg) => write!(f, "RPC error: {}", msg),
227 ParseError::ConversionError(msg) => write!(f, "Conversion error: {}", msg),
228 ParseError::MissingField(msg) => write!(f, "Missing field: {}", msg),
229 }
230 }
231}
232
233impl std::error::Error for ParseError {}
234
235fn extract_signature(
240 rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
241) -> Result<Signature, ParseError> {
242 let ui_tx = &rpc_tx.transaction.transaction;
243
244 match ui_tx {
245 EncodedTransaction::Binary(data, _encoding) => {
246 let bytes = general_purpose::STANDARD.decode(data)
247 .map_err(|e| ParseError::ConversionError(format!("Failed to decode base64: {}", e)))?;
248
249 let versioned_tx: solana_sdk::transaction::VersionedTransaction =
250 bincode::deserialize(&bytes).map_err(|e| {
251 ParseError::ConversionError(format!("Failed to deserialize transaction: {}", e))
252 })?;
253
254 Ok(versioned_tx.signatures[0])
255 }
256 _ => Err(ParseError::ConversionError(
257 "Unsupported transaction encoding".to_string(),
258 )),
259 }
260}
261
262pub fn convert_rpc_to_grpc(
263 rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
264) -> Result<(TransactionStatusMeta, Transaction), ParseError> {
265 let rpc_meta = rpc_tx
266 .transaction
267 .meta
268 .as_ref()
269 .ok_or_else(|| ParseError::MissingField("meta".to_string()))?;
270
271 let mut grpc_meta = TransactionStatusMeta {
273 err: None,
274 fee: rpc_meta.fee,
275 pre_balances: rpc_meta.pre_balances.clone(),
276 post_balances: rpc_meta.post_balances.clone(),
277 inner_instructions: Vec::new(),
278 log_messages: {
279 let opt: Option<Vec<String>> = rpc_meta.log_messages.clone().into();
280 opt.unwrap_or_default()
281 },
282 pre_token_balances: Vec::new(),
283 post_token_balances: Vec::new(),
284 rewards: Vec::new(),
285 loaded_writable_addresses: {
286 let loaded_opt: Option<solana_transaction_status::UiLoadedAddresses> =
287 rpc_meta.loaded_addresses.clone().into();
288 loaded_opt
289 .map(|addrs| {
290 addrs
291 .writable
292 .iter()
293 .map(|pk_str| {
294 use std::str::FromStr;
295 solana_sdk::pubkey::Pubkey::from_str(pk_str)
296 .unwrap()
297 .to_bytes()
298 .to_vec()
299 })
300 .collect()
301 })
302 .unwrap_or_default()
303 },
304 loaded_readonly_addresses: {
305 let loaded_opt: Option<solana_transaction_status::UiLoadedAddresses> =
306 rpc_meta.loaded_addresses.clone().into();
307 loaded_opt
308 .map(|addrs| {
309 addrs
310 .readonly
311 .iter()
312 .map(|pk_str| {
313 use std::str::FromStr;
314 solana_sdk::pubkey::Pubkey::from_str(pk_str)
315 .unwrap()
316 .to_bytes()
317 .to_vec()
318 })
319 .collect()
320 })
321 .unwrap_or_default()
322 },
323 return_data: None,
324 compute_units_consumed: rpc_meta.compute_units_consumed.clone().into(),
325 cost_units: None,
326 inner_instructions_none: {
327 let opt: Option<Vec<_>> = rpc_meta.inner_instructions.clone().into();
328 opt.is_none()
329 },
330 log_messages_none: {
331 let opt: Option<Vec<String>> = rpc_meta.log_messages.clone().into();
332 opt.is_none()
333 },
334 return_data_none: {
335 let opt: Option<solana_transaction_status::UiTransactionReturnData> = rpc_meta.return_data.clone().into();
336 opt.is_none()
337 },
338 };
339
340 let inner_instructions_opt: Option<Vec<_>> = rpc_meta.inner_instructions.clone().into();
342 if let Some(ref inner_instructions) = inner_instructions_opt {
343 for inner in inner_instructions {
344 let mut grpc_inner = InnerInstructions {
345 index: inner.index as u32,
346 instructions: Vec::new(),
347 };
348
349 for ix in &inner.instructions {
350 if let solana_transaction_status::UiInstruction::Compiled(compiled) = ix {
351 let data = bs58::decode(&compiled.data)
353 .into_vec()
354 .map_err(|e| {
355 ParseError::ConversionError(format!(
356 "Failed to decode instruction data: {}",
357 e
358 ))
359 })?;
360
361 grpc_inner.instructions.push(InnerInstruction {
362 program_id_index: compiled.program_id_index as u32,
363 accounts: compiled.accounts.clone(),
364 data,
365 stack_height: compiled.stack_height.map(|h| h as u32),
366 });
367 }
368 }
369
370 grpc_meta.inner_instructions.push(grpc_inner);
371 }
372 }
373
374 let ui_tx = &rpc_tx.transaction.transaction;
376
377 let (message, signatures) = match ui_tx {
378 EncodedTransaction::Binary(data, _encoding) => {
379 let bytes = general_purpose::STANDARD.decode(data).map_err(|e| {
381 ParseError::ConversionError(format!("Failed to decode base64: {}", e))
382 })?;
383
384 let versioned_tx: solana_sdk::transaction::VersionedTransaction =
386 bincode::deserialize(&bytes).map_err(|e| {
387 ParseError::ConversionError(format!("Failed to deserialize transaction: {}", e))
388 })?;
389
390 let sigs: Vec<Vec<u8>> = versioned_tx
391 .signatures
392 .iter()
393 .map(|s| s.as_ref().to_vec())
394 .collect();
395
396 let message = match versioned_tx.message {
397 solana_sdk::message::VersionedMessage::Legacy(legacy_msg) => {
398 convert_legacy_message(&legacy_msg)?
399 }
400 solana_sdk::message::VersionedMessage::V0(v0_msg) => convert_v0_message(&v0_msg)?,
401 };
402
403 (message, sigs)
404 }
405 EncodedTransaction::Json(_) => {
406 return Err(ParseError::ConversionError(
407 "JSON encoded transactions not supported yet".to_string(),
408 ));
409 }
410 _ => {
411 return Err(ParseError::ConversionError(
412 "Unsupported transaction encoding".to_string(),
413 ));
414 }
415 };
416
417 let grpc_tx = Transaction {
418 signatures,
419 message: Some(message),
420 };
421
422 Ok((grpc_meta, grpc_tx))
423}
424
425fn convert_legacy_message(
426 msg: &solana_sdk::message::legacy::Message,
427) -> Result<Message, ParseError> {
428 let account_keys: Vec<Vec<u8>> = msg
429 .account_keys
430 .iter()
431 .map(|k| k.to_bytes().to_vec())
432 .collect();
433
434 let instructions: Vec<CompiledInstruction> = msg
435 .instructions
436 .iter()
437 .map(|ix| CompiledInstruction {
438 program_id_index: ix.program_id_index as u32,
439 accounts: ix.accounts.clone(),
440 data: ix.data.clone(),
441 })
442 .collect();
443
444 Ok(Message {
445 header: Some(MessageHeader {
446 num_required_signatures: msg.header.num_required_signatures as u32,
447 num_readonly_signed_accounts: msg.header.num_readonly_signed_accounts as u32,
448 num_readonly_unsigned_accounts: msg.header.num_readonly_unsigned_accounts as u32,
449 }),
450 account_keys,
451 recent_blockhash: msg.recent_blockhash.to_bytes().to_vec(),
452 instructions,
453 versioned: false,
454 address_table_lookups: Vec::new(),
455 })
456}
457
458fn convert_v0_message(msg: &solana_sdk::message::v0::Message) -> Result<Message, ParseError> {
459 let account_keys: Vec<Vec<u8>> = msg
460 .account_keys
461 .iter()
462 .map(|k| k.to_bytes().to_vec())
463 .collect();
464
465 let instructions: Vec<CompiledInstruction> = msg
466 .instructions
467 .iter()
468 .map(|ix| CompiledInstruction {
469 program_id_index: ix.program_id_index as u32,
470 accounts: ix.accounts.clone(),
471 data: ix.data.clone(),
472 })
473 .collect();
474
475 Ok(Message {
476 header: Some(MessageHeader {
477 num_required_signatures: msg.header.num_required_signatures as u32,
478 num_readonly_signed_accounts: msg.header.num_readonly_signed_accounts as u32,
479 num_readonly_unsigned_accounts: msg.header.num_readonly_unsigned_accounts as u32,
480 }),
481 account_keys,
482 recent_blockhash: msg.recent_blockhash.to_bytes().to_vec(),
483 instructions,
484 versioned: true,
485 address_table_lookups: msg
486 .address_table_lookups
487 .iter()
488 .map(|lookup| MessageAddressTableLookup {
489 account_key: lookup.account_key.to_bytes().to_vec(),
490 writable_indexes: lookup.writable_indexes.clone(),
491 readonly_indexes: lookup.readonly_indexes.clone(),
492 })
493 .collect(),
494 })
495}