1use crate::core::events::DexEvent;
7use crate::grpc::instruction_parser::parse_instructions_enhanced;
8use crate::grpc::types::EventTypeFilter;
9use crate::instr::read_pubkey_fast;
10use base64::{Engine as _, engine::general_purpose};
11use solana_client::rpc_client::RpcClient;
12use solana_client::rpc_config::RpcTransactionConfig;
13use solana_sdk::signature::Signature;
14use solana_sdk::pubkey::Pubkey;
15use solana_transaction_status::{
16 EncodedConfirmedTransactionWithStatusMeta, EncodedTransaction, UiTransactionEncoding,
17};
18use std::collections::HashMap;
19use yellowstone_grpc_proto::prelude::{
20 CompiledInstruction, InnerInstruction, InnerInstructions, Message, MessageAddressTableLookup, MessageHeader,
21 Transaction, TransactionStatusMeta,
22};
23
24pub fn parse_transaction_from_rpc(
46 rpc_client: &RpcClient,
47 signature: &Signature,
48 filter: Option<&EventTypeFilter>,
49) -> Result<Vec<DexEvent>, ParseError> {
50 let config = RpcTransactionConfig {
52 encoding: Some(UiTransactionEncoding::Base64),
53 commitment: None,
54 max_supported_transaction_version: Some(0),
55 };
56
57 let rpc_tx = rpc_client
58 .get_transaction_with_config(signature, config)
59 .map_err(|e| ParseError::RpcError(e.to_string()))?;
60
61 parse_rpc_transaction(&rpc_tx, filter)
62}
63
64pub fn parse_rpc_transaction(
81 rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
82 filter: Option<&EventTypeFilter>,
83) -> Result<Vec<DexEvent>, ParseError> {
84 let (grpc_meta, grpc_tx) = convert_rpc_to_grpc(rpc_tx)?;
86
87 let signature = extract_signature(rpc_tx)?;
89 let slot = rpc_tx.slot;
90 let block_time_us = rpc_tx.block_time.map(|t| t * 1_000_000);
91 let grpc_recv_us = std::time::SystemTime::now()
92 .duration_since(std::time::UNIX_EPOCH)
93 .unwrap()
94 .as_micros() as i64;
95
96 let grpc_tx_opt = Some(grpc_tx);
98
99 let mut program_invokes: HashMap<&str, Vec<(i32, i32)>> = HashMap::new();
102
103 if let Some(ref tx) = grpc_tx_opt {
104 if let Some(ref msg) = tx.message {
105 let keys_len = msg.account_keys.len();
107 let writable_len = grpc_meta.loaded_writable_addresses.len();
108 let get_key = |i: usize| -> Option<&Vec<u8>> {
109 if i < keys_len {
110 msg.account_keys.get(i)
111 } else if i < keys_len + writable_len {
112 grpc_meta.loaded_writable_addresses.get(i - keys_len)
113 } else {
114 grpc_meta.loaded_readonly_addresses.get(i - keys_len - writable_len)
115 }
116 };
117
118 for (i, ix) in msg.instructions.iter().enumerate() {
120 let pid = get_key(ix.program_id_index as usize)
121 .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
122 let pid_str = pid.to_string();
123 let pid_static: &'static str = pid_str.leak();
124 program_invokes.entry(pid_static).or_default().push((i as i32, -1));
125 }
126
127 for inner in &grpc_meta.inner_instructions {
129 let outer_idx = inner.index as usize;
130 for (j, inner_ix) in inner.instructions.iter().enumerate() {
131 let pid = get_key(inner_ix.program_id_index as usize)
132 .map_or(Pubkey::default(), |k| read_pubkey_fast(k));
133 let pid_str = pid.to_string();
134 let pid_static: &'static str = pid_str.leak();
135 program_invokes.entry(pid_static).or_default().push((outer_idx as i32, j as i32));
136 }
137 }
138 }
139 }
140
141 let mut events = parse_instructions_enhanced(
143 &grpc_meta,
144 &grpc_tx_opt,
145 signature,
146 slot,
147 0, block_time_us,
149 grpc_recv_us,
150 filter,
151 );
152
153 let mut is_created_buy = false;
155
156 for log in &grpc_meta.log_messages {
157 if let Some(mut event) = crate::logs::parse_log(
158 log,
159 signature,
160 slot,
161 0, block_time_us,
163 grpc_recv_us,
164 filter,
165 is_created_buy,
166 ) {
167 if matches!(event, DexEvent::PumpFunCreate(_)) {
169 is_created_buy = true;
170 }
171
172 crate::core::account_dispatcher::fill_accounts_from_transaction_data(
174 &mut event,
175 &grpc_meta,
176 &grpc_tx_opt,
177 &program_invokes,
178 );
179
180 crate::core::common_filler::fill_data(
182 &mut event,
183 &grpc_meta,
184 &grpc_tx_opt,
185 &program_invokes,
186 );
187
188 events.push(event);
189 }
190 }
191
192 Ok(events)
193}
194
195#[derive(Debug)]
197pub enum ParseError {
198 RpcError(String),
199 ConversionError(String),
200 MissingField(String),
201}
202
203impl std::fmt::Display for ParseError {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 match self {
206 ParseError::RpcError(msg) => write!(f, "RPC error: {}", msg),
207 ParseError::ConversionError(msg) => write!(f, "Conversion error: {}", msg),
208 ParseError::MissingField(msg) => write!(f, "Missing field: {}", msg),
209 }
210 }
211}
212
213impl std::error::Error for ParseError {}
214
215fn extract_signature(
220 rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
221) -> Result<Signature, ParseError> {
222 let ui_tx = &rpc_tx.transaction.transaction;
223
224 match ui_tx {
225 EncodedTransaction::Binary(data, _encoding) => {
226 let bytes = general_purpose::STANDARD.decode(data)
227 .map_err(|e| ParseError::ConversionError(format!("Failed to decode base64: {}", e)))?;
228
229 let versioned_tx: solana_sdk::transaction::VersionedTransaction =
230 bincode::deserialize(&bytes).map_err(|e| {
231 ParseError::ConversionError(format!("Failed to deserialize transaction: {}", e))
232 })?;
233
234 Ok(versioned_tx.signatures[0])
235 }
236 _ => Err(ParseError::ConversionError(
237 "Unsupported transaction encoding".to_string(),
238 )),
239 }
240}
241
242pub fn convert_rpc_to_grpc(
243 rpc_tx: &EncodedConfirmedTransactionWithStatusMeta,
244) -> Result<(TransactionStatusMeta, Transaction), ParseError> {
245 let rpc_meta = rpc_tx
246 .transaction
247 .meta
248 .as_ref()
249 .ok_or_else(|| ParseError::MissingField("meta".to_string()))?;
250
251 let mut grpc_meta = TransactionStatusMeta {
253 err: None,
254 fee: rpc_meta.fee,
255 pre_balances: rpc_meta.pre_balances.clone(),
256 post_balances: rpc_meta.post_balances.clone(),
257 inner_instructions: Vec::new(),
258 log_messages: {
259 let opt: Option<Vec<String>> = rpc_meta.log_messages.clone().into();
260 opt.unwrap_or_default()
261 },
262 pre_token_balances: Vec::new(),
263 post_token_balances: Vec::new(),
264 rewards: Vec::new(),
265 loaded_writable_addresses: {
266 let loaded_opt: Option<solana_transaction_status::UiLoadedAddresses> =
267 rpc_meta.loaded_addresses.clone().into();
268 loaded_opt
269 .map(|addrs| {
270 addrs
271 .writable
272 .iter()
273 .map(|pk_str| {
274 use std::str::FromStr;
275 solana_sdk::pubkey::Pubkey::from_str(pk_str)
276 .unwrap()
277 .to_bytes()
278 .to_vec()
279 })
280 .collect()
281 })
282 .unwrap_or_default()
283 },
284 loaded_readonly_addresses: {
285 let loaded_opt: Option<solana_transaction_status::UiLoadedAddresses> =
286 rpc_meta.loaded_addresses.clone().into();
287 loaded_opt
288 .map(|addrs| {
289 addrs
290 .readonly
291 .iter()
292 .map(|pk_str| {
293 use std::str::FromStr;
294 solana_sdk::pubkey::Pubkey::from_str(pk_str)
295 .unwrap()
296 .to_bytes()
297 .to_vec()
298 })
299 .collect()
300 })
301 .unwrap_or_default()
302 },
303 return_data: None,
304 compute_units_consumed: rpc_meta.compute_units_consumed.clone().into(),
305 cost_units: None,
306 inner_instructions_none: {
307 let opt: Option<Vec<_>> = rpc_meta.inner_instructions.clone().into();
308 opt.is_none()
309 },
310 log_messages_none: {
311 let opt: Option<Vec<String>> = rpc_meta.log_messages.clone().into();
312 opt.is_none()
313 },
314 return_data_none: {
315 let opt: Option<solana_transaction_status::UiTransactionReturnData> = rpc_meta.return_data.clone().into();
316 opt.is_none()
317 },
318 };
319
320 let inner_instructions_opt: Option<Vec<_>> = rpc_meta.inner_instructions.clone().into();
322 if let Some(ref inner_instructions) = inner_instructions_opt {
323 for inner in inner_instructions {
324 let mut grpc_inner = InnerInstructions {
325 index: inner.index as u32,
326 instructions: Vec::new(),
327 };
328
329 for ix in &inner.instructions {
330 if let solana_transaction_status::UiInstruction::Compiled(compiled) = ix {
331 let data = bs58::decode(&compiled.data)
333 .into_vec()
334 .map_err(|e| {
335 ParseError::ConversionError(format!(
336 "Failed to decode instruction data: {}",
337 e
338 ))
339 })?;
340
341 grpc_inner.instructions.push(InnerInstruction {
342 program_id_index: compiled.program_id_index as u32,
343 accounts: compiled.accounts.clone(),
344 data,
345 stack_height: compiled.stack_height.map(|h| h as u32),
346 });
347 }
348 }
349
350 grpc_meta.inner_instructions.push(grpc_inner);
351 }
352 }
353
354 let ui_tx = &rpc_tx.transaction.transaction;
356
357 let (message, signatures) = match ui_tx {
358 EncodedTransaction::Binary(data, _encoding) => {
359 let bytes = general_purpose::STANDARD.decode(data).map_err(|e| {
361 ParseError::ConversionError(format!("Failed to decode base64: {}", e))
362 })?;
363
364 let versioned_tx: solana_sdk::transaction::VersionedTransaction =
366 bincode::deserialize(&bytes).map_err(|e| {
367 ParseError::ConversionError(format!("Failed to deserialize transaction: {}", e))
368 })?;
369
370 let sigs: Vec<Vec<u8>> = versioned_tx
371 .signatures
372 .iter()
373 .map(|s| s.as_ref().to_vec())
374 .collect();
375
376 let message = match versioned_tx.message {
377 solana_sdk::message::VersionedMessage::Legacy(legacy_msg) => {
378 convert_legacy_message(&legacy_msg)?
379 }
380 solana_sdk::message::VersionedMessage::V0(v0_msg) => convert_v0_message(&v0_msg)?,
381 };
382
383 (message, sigs)
384 }
385 EncodedTransaction::Json(_) => {
386 return Err(ParseError::ConversionError(
387 "JSON encoded transactions not supported yet".to_string(),
388 ));
389 }
390 _ => {
391 return Err(ParseError::ConversionError(
392 "Unsupported transaction encoding".to_string(),
393 ));
394 }
395 };
396
397 let grpc_tx = Transaction {
398 signatures,
399 message: Some(message),
400 };
401
402 Ok((grpc_meta, grpc_tx))
403}
404
405fn convert_legacy_message(
406 msg: &solana_sdk::message::legacy::Message,
407) -> Result<Message, ParseError> {
408 let account_keys: Vec<Vec<u8>> = msg
409 .account_keys
410 .iter()
411 .map(|k| k.to_bytes().to_vec())
412 .collect();
413
414 let instructions: Vec<CompiledInstruction> = msg
415 .instructions
416 .iter()
417 .map(|ix| CompiledInstruction {
418 program_id_index: ix.program_id_index as u32,
419 accounts: ix.accounts.clone(),
420 data: ix.data.clone(),
421 })
422 .collect();
423
424 Ok(Message {
425 header: Some(MessageHeader {
426 num_required_signatures: msg.header.num_required_signatures as u32,
427 num_readonly_signed_accounts: msg.header.num_readonly_signed_accounts as u32,
428 num_readonly_unsigned_accounts: msg.header.num_readonly_unsigned_accounts as u32,
429 }),
430 account_keys,
431 recent_blockhash: msg.recent_blockhash.to_bytes().to_vec(),
432 instructions,
433 versioned: false,
434 address_table_lookups: Vec::new(),
435 })
436}
437
438fn convert_v0_message(msg: &solana_sdk::message::v0::Message) -> Result<Message, ParseError> {
439 let account_keys: Vec<Vec<u8>> = msg
440 .account_keys
441 .iter()
442 .map(|k| k.to_bytes().to_vec())
443 .collect();
444
445 let instructions: Vec<CompiledInstruction> = msg
446 .instructions
447 .iter()
448 .map(|ix| CompiledInstruction {
449 program_id_index: ix.program_id_index as u32,
450 accounts: ix.accounts.clone(),
451 data: ix.data.clone(),
452 })
453 .collect();
454
455 Ok(Message {
456 header: Some(MessageHeader {
457 num_required_signatures: msg.header.num_required_signatures as u32,
458 num_readonly_signed_accounts: msg.header.num_readonly_signed_accounts as u32,
459 num_readonly_unsigned_accounts: msg.header.num_readonly_unsigned_accounts as u32,
460 }),
461 account_keys,
462 recent_blockhash: msg.recent_blockhash.to_bytes().to_vec(),
463 instructions,
464 versioned: true,
465 address_table_lookups: msg
466 .address_table_lookups
467 .iter()
468 .map(|lookup| MessageAddressTableLookup {
469 account_key: lookup.account_key.to_bytes().to_vec(),
470 writable_indexes: lookup.writable_indexes.clone(),
471 readonly_indexes: lookup.readonly_indexes.clone(),
472 })
473 .collect(),
474 })
475}