1use crate::core::events::DexEvent;
7use crate::grpc::instruction_parser::parse_instructions_enhanced;
8use crate::grpc::types::EventTypeFilter;
9use crate::instr::read_pubkey_fast;
10use base64::{engine::general_purpose, Engine as _};
11use solana_client::rpc_client::RpcClient;
12use solana_client::rpc_config::RpcTransactionConfig;
13use solana_sdk::pubkey::Pubkey;
14use solana_sdk::signature::Signature;
15use solana_transaction_status::{
16 EncodedConfirmedTransactionWithStatusMeta, EncodedTransaction, UiTransactionEncoding,
17};
18use std::collections::HashMap;
19use yellowstone_grpc_proto::prelude::{
20 CompiledInstruction, InnerInstruction, InnerInstructions, Message, MessageAddressTableLookup,
21 MessageHeader, Transaction, TransactionStatusMeta,
22};
23
24pub fn parse_transaction_from_rpc(
46 rpc_client: &RpcClient,
47 signature: &Signature,
48 filter: Option<&EventTypeFilter>,
49) -> Result<Vec<DexEvent>, ParseError> {
50 let config = RpcTransactionConfig {
52 encoding: Some(UiTransactionEncoding::Base64),
53 commitment: None,
54 max_supported_transaction_version: Some(0),
55 };
56
57 let rpc_tx = rpc_client.get_transaction_with_config(signature, config).map_err(|e| {
58 let msg = e.to_string();
59 if msg.contains("invalid type: null") && msg.contains("EncodedConfirmedTransactionWithStatusMeta") {
60 ParseError::RpcError(format!(
61 "Transaction not found (RPC returned null). Common causes: 1) Transaction is too old and pruned (use an archive RPC). 2) Wrong network or invalid signature. Try SOLANA_RPC_URL with an archive endpoint (e.g. Helius, QuickNode) or a more recent tx. Original: {}",
62 msg
63 ))
64 } else {
65 ParseError::RpcError(msg)
66 }
67 })?;
68
69 parse_rpc_transaction(&rpc_tx, filter)
70}
71
72pub fn parse_rpc_transaction(
89 rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
90 filter: Option<&EventTypeFilter>,
91) -> Result<Vec<DexEvent>, ParseError> {
92 let (grpc_meta, grpc_tx) = convert_rpc_to_grpc(rpc_tx)?;
94
95 let signature = extract_signature(rpc_tx)?;
97 let slot = rpc_tx.slot;
98 let block_time_us = rpc_tx.block_time.map(|t| t * 1_000_000);
99 let grpc_recv_us =
100 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_micros()
101 as i64;
102
103 let grpc_tx_opt = Some(grpc_tx);
105
106 let recent_blockhash = grpc_tx_opt.as_ref().and_then(|t| t.message.as_ref()).and_then(|m| {
107 if m.recent_blockhash.is_empty() {
108 None
109 } else {
110 Some(m.recent_blockhash.clone())
111 }
112 });
113
114 let mut program_invokes: HashMap<&str, Vec<(i32, i32)>> = HashMap::new();
117
118 if let Some(ref tx) = grpc_tx_opt {
119 if let Some(ref msg) = tx.message {
120 let keys_len = msg.account_keys.len();
122 let writable_len = grpc_meta.loaded_writable_addresses.len();
123 let get_key = |i: usize| -> Option<&Vec<u8>> {
124 if i < keys_len {
125 msg.account_keys.get(i)
126 } else if i < keys_len + writable_len {
127 grpc_meta.loaded_writable_addresses.get(i - keys_len)
128 } else {
129 grpc_meta.loaded_readonly_addresses.get(i - keys_len - writable_len)
130 }
131 };
132
133 for (i, ix) in msg.instructions.iter().enumerate() {
135 let pid = get_key(ix.program_id_index as usize)
136 .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
137 let pid_str = pid.to_string();
138 let pid_static: &'static str = pid_str.leak();
139 program_invokes.entry(pid_static).or_default().push((i as i32, -1));
140 }
141
142 for inner in &grpc_meta.inner_instructions {
144 let outer_idx = inner.index as usize;
145 for (j, inner_ix) in inner.instructions.iter().enumerate() {
146 let pid = get_key(inner_ix.program_id_index as usize)
147 .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
148 let pid_str = pid.to_string();
149 let pid_static: &'static str = pid_str.leak();
150 program_invokes
151 .entry(pid_static)
152 .or_default()
153 .push((outer_idx as i32, j as i32));
154 }
155 }
156 }
157 }
158
159 let mut events = parse_instructions_enhanced(
161 &grpc_meta,
162 &grpc_tx_opt,
163 signature,
164 slot,
165 0, block_time_us,
167 grpc_recv_us,
168 filter,
169 );
170
171 let mut is_created_buy = false;
173
174 for log in &grpc_meta.log_messages {
175 if let Some(mut event) = crate::logs::parse_log(
176 log,
177 signature,
178 slot,
179 0, block_time_us,
181 grpc_recv_us,
182 filter,
183 is_created_buy,
184 recent_blockhash.as_deref(),
185 ) {
186 if matches!(event, DexEvent::PumpFunCreate(_) | DexEvent::PumpFunCreateV2(_)) {
188 is_created_buy = true;
189 }
190
191 crate::core::account_dispatcher::fill_accounts_from_transaction_data(
193 &mut event,
194 &grpc_meta,
195 &grpc_tx_opt,
196 &program_invokes,
197 );
198
199 crate::core::common_filler::fill_data(
201 &mut event,
202 &grpc_meta,
203 &grpc_tx_opt,
204 &program_invokes,
205 );
206
207 events.push(event);
208 }
209 }
210
211 Ok(events)
212}
213
214#[derive(Debug)]
216pub enum ParseError {
217 RpcError(String),
218 ConversionError(String),
219 MissingField(String),
220}
221
222impl std::fmt::Display for ParseError {
223 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
224 match self {
225 ParseError::RpcError(msg) => write!(f, "RPC error: {}", msg),
226 ParseError::ConversionError(msg) => write!(f, "Conversion error: {}", msg),
227 ParseError::MissingField(msg) => write!(f, "Missing field: {}", msg),
228 }
229 }
230}
231
232impl std::error::Error for ParseError {}
233
234fn extract_signature(
239 rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
240) -> Result<Signature, ParseError> {
241 let ui_tx = &rpc_tx.transaction.transaction;
242
243 match ui_tx {
244 EncodedTransaction::Binary(data, _encoding) => {
245 let bytes = general_purpose::STANDARD.decode(data).map_err(|e| {
246 ParseError::ConversionError(format!("Failed to decode base64: {}", e))
247 })?;
248
249 let versioned_tx: solana_sdk::transaction::VersionedTransaction =
250 bincode::deserialize(&bytes).map_err(|e| {
251 ParseError::ConversionError(format!("Failed to deserialize transaction: {}", e))
252 })?;
253
254 Ok(versioned_tx.signatures[0])
255 }
256 _ => Err(ParseError::ConversionError("Unsupported transaction encoding".to_string())),
257 }
258}
259
260pub fn convert_rpc_to_grpc(
261 rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
262) -> Result<(TransactionStatusMeta, Transaction), ParseError> {
263 let rpc_meta = rpc_tx
264 .transaction
265 .meta
266 .as_ref()
267 .ok_or_else(|| ParseError::MissingField("meta".to_string()))?;
268
269 let mut grpc_meta = TransactionStatusMeta {
271 err: None,
272 fee: rpc_meta.fee,
273 pre_balances: rpc_meta.pre_balances.clone(),
274 post_balances: rpc_meta.post_balances.clone(),
275 inner_instructions: Vec::new(),
276 log_messages: {
277 let opt: Option<Vec<String>> = rpc_meta.log_messages.clone().into();
278 opt.unwrap_or_default()
279 },
280 pre_token_balances: Vec::new(),
281 post_token_balances: Vec::new(),
282 rewards: Vec::new(),
283 loaded_writable_addresses: {
284 let loaded_opt: Option<solana_transaction_status::UiLoadedAddresses> =
285 rpc_meta.loaded_addresses.clone().into();
286 loaded_opt
287 .map(|addrs| {
288 addrs
289 .writable
290 .iter()
291 .map(|pk_str| {
292 use std::str::FromStr;
293 solana_sdk::pubkey::Pubkey::from_str(pk_str)
294 .unwrap()
295 .to_bytes()
296 .to_vec()
297 })
298 .collect()
299 })
300 .unwrap_or_default()
301 },
302 loaded_readonly_addresses: {
303 let loaded_opt: Option<solana_transaction_status::UiLoadedAddresses> =
304 rpc_meta.loaded_addresses.clone().into();
305 loaded_opt
306 .map(|addrs| {
307 addrs
308 .readonly
309 .iter()
310 .map(|pk_str| {
311 use std::str::FromStr;
312 solana_sdk::pubkey::Pubkey::from_str(pk_str)
313 .unwrap()
314 .to_bytes()
315 .to_vec()
316 })
317 .collect()
318 })
319 .unwrap_or_default()
320 },
321 return_data: None,
322 compute_units_consumed: rpc_meta.compute_units_consumed.clone().into(),
323
324 inner_instructions_none: {
325 let opt: Option<Vec<_>> = rpc_meta.inner_instructions.clone().into();
326 opt.is_none()
327 },
328 log_messages_none: {
329 let opt: Option<Vec<String>> = rpc_meta.log_messages.clone().into();
330 opt.is_none()
331 },
332 return_data_none: {
333 let opt: Option<solana_transaction_status::UiTransactionReturnData> =
334 rpc_meta.return_data.clone().into();
335 opt.is_none()
336 },
337 cost_units: rpc_meta.compute_units_consumed.clone().into(),
338 };
339
340 let inner_instructions_opt: Option<Vec<_>> = rpc_meta.inner_instructions.clone().into();
342 if let Some(ref inner_instructions) = inner_instructions_opt {
343 for inner in inner_instructions {
344 let mut grpc_inner =
345 InnerInstructions { index: inner.index as u32, instructions: Vec::new() };
346
347 for ix in &inner.instructions {
348 if let solana_transaction_status::UiInstruction::Compiled(compiled) = ix {
349 let data = bs58::decode(&compiled.data).into_vec().map_err(|e| {
351 ParseError::ConversionError(format!(
352 "Failed to decode instruction data: {}",
353 e
354 ))
355 })?;
356
357 grpc_inner.instructions.push(InnerInstruction {
358 program_id_index: compiled.program_id_index as u32,
359 accounts: compiled.accounts.clone(),
360 data,
361 stack_height: compiled.stack_height.map(|h| h as u32),
362 });
363 }
364 }
365
366 grpc_meta.inner_instructions.push(grpc_inner);
367 }
368 }
369
370 let ui_tx = &rpc_tx.transaction.transaction;
372
373 let (message, signatures) = match ui_tx {
374 EncodedTransaction::Binary(data, _encoding) => {
375 let bytes = general_purpose::STANDARD.decode(data).map_err(|e| {
377 ParseError::ConversionError(format!("Failed to decode base64: {}", e))
378 })?;
379
380 let versioned_tx: solana_sdk::transaction::VersionedTransaction =
382 bincode::deserialize(&bytes).map_err(|e| {
383 ParseError::ConversionError(format!("Failed to deserialize transaction: {}", e))
384 })?;
385
386 let sigs: Vec<Vec<u8>> =
387 versioned_tx.signatures.iter().map(|s| s.as_ref().to_vec()).collect();
388
389 let message = match versioned_tx.message {
390 solana_sdk::message::VersionedMessage::Legacy(legacy_msg) => {
391 convert_legacy_message(&legacy_msg)?
392 }
393 solana_sdk::message::VersionedMessage::V0(v0_msg) => convert_v0_message(&v0_msg)?,
394 };
395
396 (message, sigs)
397 }
398 EncodedTransaction::Json(_) => {
399 return Err(ParseError::ConversionError(
400 "JSON encoded transactions not supported yet".to_string(),
401 ));
402 }
403 _ => {
404 return Err(ParseError::ConversionError(
405 "Unsupported transaction encoding".to_string(),
406 ));
407 }
408 };
409
410 let grpc_tx = Transaction { signatures, message: Some(message) };
411
412 Ok((grpc_meta, grpc_tx))
413}
414
415fn convert_legacy_message(
416 msg: &solana_sdk::message::legacy::Message,
417) -> Result<Message, ParseError> {
418 let account_keys: Vec<Vec<u8>> =
419 msg.account_keys.iter().map(|k| k.to_bytes().to_vec()).collect();
420
421 let instructions: Vec<CompiledInstruction> = msg
422 .instructions
423 .iter()
424 .map(|ix| CompiledInstruction {
425 program_id_index: ix.program_id_index as u32,
426 accounts: ix.accounts.clone(),
427 data: ix.data.clone(),
428 })
429 .collect();
430
431 Ok(Message {
432 header: Some(MessageHeader {
433 num_required_signatures: msg.header.num_required_signatures as u32,
434 num_readonly_signed_accounts: msg.header.num_readonly_signed_accounts as u32,
435 num_readonly_unsigned_accounts: msg.header.num_readonly_unsigned_accounts as u32,
436 }),
437 account_keys,
438 recent_blockhash: msg.recent_blockhash.to_bytes().to_vec(),
439 instructions,
440 versioned: false,
441 address_table_lookups: Vec::new(),
442 })
443}
444
445fn convert_v0_message(msg: &solana_sdk::message::v0::Message) -> Result<Message, ParseError> {
446 let account_keys: Vec<Vec<u8>> =
447 msg.account_keys.iter().map(|k| k.to_bytes().to_vec()).collect();
448
449 let instructions: Vec<CompiledInstruction> = msg
450 .instructions
451 .iter()
452 .map(|ix| CompiledInstruction {
453 program_id_index: ix.program_id_index as u32,
454 accounts: ix.accounts.clone(),
455 data: ix.data.clone(),
456 })
457 .collect();
458
459 Ok(Message {
460 header: Some(MessageHeader {
461 num_required_signatures: msg.header.num_required_signatures as u32,
462 num_readonly_signed_accounts: msg.header.num_readonly_signed_accounts as u32,
463 num_readonly_unsigned_accounts: msg.header.num_readonly_unsigned_accounts as u32,
464 }),
465 account_keys,
466 recent_blockhash: msg.recent_blockhash.to_bytes().to_vec(),
467 instructions,
468 versioned: true,
469 address_table_lookups: msg
470 .address_table_lookups
471 .iter()
472 .map(|lookup| MessageAddressTableLookup {
473 account_key: lookup.account_key.to_bytes().to_vec(),
474 writable_indexes: lookup.writable_indexes.clone(),
475 readonly_indexes: lookup.readonly_indexes.clone(),
476 })
477 .collect(),
478 })
479}