1use {
23 crate::{
24 deserialize::CarbonDeserialize, error::CarbonResult, filter::Filter,
25 metrics::MetricsCollection, processor::Processor, transaction::TransactionMetadata,
26 },
27 async_trait::async_trait,
28 serde::{Deserialize, Serialize},
29 solana_instruction::AccountMeta,
30 solana_pubkey::Pubkey,
31 std::{
32 ops::{Deref, DerefMut},
33 sync::Arc,
34 },
35};
36
37#[derive(Debug, Clone)]
57pub struct InstructionMetadata {
58 pub transaction_metadata: Arc<TransactionMetadata>,
59 pub stack_height: u32,
60 pub index: u32,
61 pub absolute_path: Vec<u8>,
62}
63
64#[derive(Debug)]
65enum LogType {
66 Start(usize), Data,
68 CU,
69 Finish,
70}
71
72impl InstructionMetadata {
73 pub fn decode_log_events<T: CarbonDeserialize>(&self) -> Vec<T> {
83 self.extract_event_log_data()
84 .into_iter()
85 .filter(|log| log.len() >= 8)
86 .filter_map(|log| <T as CarbonDeserialize>::deserialize(&log[8..]))
87 .collect()
88 }
89
90 fn extract_event_log_data(&self) -> Vec<Vec<u8>> {
98 let logs = match &self.transaction_metadata.meta.log_messages {
99 Some(logs) => logs,
100 None => return Vec::new(),
101 };
102
103 let mut extracted_logs = Vec::new();
104 let mut current_stack_height = 0usize;
105 let mut last_stack_height = 0usize;
106
107 let mut position_at_level: std::collections::HashMap<usize, u8> =
108 std::collections::HashMap::new();
109
110 for log in logs {
111 let parsed_log = self.parse_log(log);
112
113 match parsed_log {
114 LogType::Start(stack_height) => {
115 current_stack_height = stack_height;
116
117 let current_pos = if stack_height > last_stack_height {
118 0
119 } else {
120 position_at_level
121 .get(&stack_height)
122 .map(|&pos| pos + 1)
123 .unwrap_or(0)
124 };
125
126 position_at_level.insert(stack_height, current_pos);
127 last_stack_height = stack_height;
128 }
129 LogType::Finish => {
130 current_stack_height = current_stack_height.saturating_sub(1);
131 }
132 _ => {}
133 }
134
135 let current_path: Vec<u8> = (1..=current_stack_height)
136 .map(|level| position_at_level.get(&level).copied().unwrap_or(0))
137 .collect();
138
139 if current_path == self.absolute_path && matches!(parsed_log, LogType::Data) {
140 if let Some(data) = log.split_whitespace().last() {
141 if let Ok(buf) =
142 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, data)
143 {
144 extracted_logs.push(buf);
145 }
146 }
147 }
148 }
149
150 extracted_logs
151 }
152
153 fn parse_log(&self, log: &str) -> LogType {
155 if log.starts_with("Program ") && log.contains(" invoke [") {
156 let parts: Vec<&str> = log.split_whitespace().collect();
157 if parts.len() >= 4 && parts[0] == "Program" && parts[2] == "invoke" {
158 let level_str = parts[3].trim_start_matches('[').trim_end_matches(']');
159 if let Ok(level) = level_str.parse::<usize>() {
160 return LogType::Start(level);
161 }
162 }
163 } else if log.starts_with("Program ")
164 && (log.ends_with(" success") || log.contains(" failed"))
165 {
166 let parts: Vec<&str> = log.split_whitespace().collect();
167 if parts.len() >= 3 && parts[0] == "Program" {
168 return LogType::Finish;
169 }
170 } else if log.contains("consumed") && log.contains("compute units") {
171 return LogType::CU;
172 }
173
174 LogType::Data
175 }
176}
177
178pub type InstructionsWithMetadata = Vec<(InstructionMetadata, solana_instruction::Instruction)>;
179
180#[derive(Debug, Clone, Deserialize, Serialize)]
198pub struct DecodedInstruction<T> {
199 pub program_id: Pubkey,
200 pub data: T,
201 pub accounts: Vec<AccountMeta>,
202}
203
204pub trait InstructionDecoder<'a> {
220 type InstructionType;
221
222 fn decode_instruction(
223 &self,
224 instruction: &'a solana_instruction::Instruction,
225 ) -> Option<DecodedInstruction<Self::InstructionType>>;
226}
227
228pub type InstructionProcessorInputType<T> = (
232 InstructionMetadata,
233 DecodedInstruction<T>,
234 NestedInstructions,
235 solana_instruction::Instruction,
236);
237
238pub struct InstructionPipe<T: Send> {
258 pub decoder:
259 Box<dyn for<'a> InstructionDecoder<'a, InstructionType = T> + Send + Sync + 'static>,
260 pub processor:
261 Box<dyn Processor<InputType = InstructionProcessorInputType<T>> + Send + Sync + 'static>,
262 pub filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
263}
264
265#[async_trait]
279pub trait InstructionPipes<'a>: Send + Sync {
280 async fn run(
281 &mut self,
282 nested_instruction: &NestedInstruction,
283 metrics: Arc<MetricsCollection>,
284 ) -> CarbonResult<()>;
285 fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>>;
286}
287
288#[async_trait]
289impl<T: Send + 'static> InstructionPipes<'_> for InstructionPipe<T> {
290 async fn run(
291 &mut self,
292 nested_instruction: &NestedInstruction,
293 metrics: Arc<MetricsCollection>,
294 ) -> CarbonResult<()> {
295 log::trace!(
296 "InstructionPipe::run(nested_instruction: {:?}, metrics)",
297 nested_instruction,
298 );
299
300 if let Some(decoded_instruction) = self
301 .decoder
302 .decode_instruction(&nested_instruction.instruction)
303 {
304 self.processor
305 .process(
306 (
307 nested_instruction.metadata.clone(),
308 decoded_instruction,
309 nested_instruction.inner_instructions.clone(),
310 nested_instruction.instruction.clone(),
311 ),
312 metrics.clone(),
313 )
314 .await?;
315 }
316
317 for nested_inner_instruction in nested_instruction.inner_instructions.iter() {
318 self.run(nested_inner_instruction, metrics.clone()).await?;
319 }
320
321 Ok(())
322 }
323
324 fn filters(&self) -> &Vec<Box<dyn Filter + Send + Sync + 'static>> {
325 &self.filters
326 }
327}
328
329#[derive(Debug, Clone)]
343pub struct NestedInstruction {
344 pub metadata: InstructionMetadata,
345 pub instruction: solana_instruction::Instruction,
346 pub inner_instructions: NestedInstructions,
347}
348
349#[derive(Debug, Default)]
350pub struct NestedInstructions(pub Vec<NestedInstruction>);
351
352impl NestedInstructions {
353 pub fn len(&self) -> usize {
354 self.0.len()
355 }
356
357 pub fn is_empty(&self) -> bool {
358 self.len() == 0
359 }
360
361 pub fn push(&mut self, nested_instruction: NestedInstruction) {
362 self.0.push(nested_instruction);
363 }
364}
365
366impl Deref for NestedInstructions {
367 type Target = [NestedInstruction];
368
369 fn deref(&self) -> &[NestedInstruction] {
370 &self.0[..]
371 }
372}
373
374impl DerefMut for NestedInstructions {
375 fn deref_mut(&mut self) -> &mut [NestedInstruction] {
376 &mut self.0[..]
377 }
378}
379
380impl Clone for NestedInstructions {
381 fn clone(&self) -> Self {
382 NestedInstructions(self.0.clone())
383 }
384}
385
386impl IntoIterator for NestedInstructions {
387 type Item = NestedInstruction;
388 type IntoIter = std::vec::IntoIter<NestedInstruction>;
389
390 fn into_iter(self) -> Self::IntoIter {
391 self.0.into_iter()
392 }
393}
394
395impl From<InstructionsWithMetadata> for NestedInstructions {
412 fn from(instructions: InstructionsWithMetadata) -> Self {
413 log::trace!("from(instructions: {:?})", instructions);
414
415 let estimated_capacity = instructions
418 .iter()
419 .filter(|(meta, _)| meta.stack_height == 1)
420 .count();
421
422 UnsafeNestedBuilder::new(estimated_capacity).build(instructions)
423 }
424}
425
426pub const MAX_INSTRUCTION_STACK_DEPTH: usize = 5;
428
429pub struct UnsafeNestedBuilder {
430 nested_ixs: Vec<NestedInstruction>,
431 level_ptrs: [Option<*mut NestedInstruction>; MAX_INSTRUCTION_STACK_DEPTH],
432}
433
434impl UnsafeNestedBuilder {
435 pub fn new(capacity: usize) -> Self {
439 Self {
440 nested_ixs: Vec::with_capacity(capacity),
441 level_ptrs: [None; MAX_INSTRUCTION_STACK_DEPTH],
442 }
443 }
444
445 pub fn build(mut self, instructions: InstructionsWithMetadata) -> NestedInstructions {
446 for (metadata, instruction) in instructions {
447 let stack_height = metadata.stack_height as usize;
448
449 assert!(stack_height > 0);
450 assert!(stack_height <= MAX_INSTRUCTION_STACK_DEPTH);
451
452 for ptr in &mut self.level_ptrs[stack_height..] {
453 *ptr = None;
454 }
455
456 let new_instruction = NestedInstruction {
457 metadata,
458 instruction,
459 inner_instructions: NestedInstructions::default(),
460 };
461
462 unsafe {
468 if stack_height == 1 {
469 self.nested_ixs.push(new_instruction);
470 let ptr = self.nested_ixs.last_mut().unwrap_unchecked() as *mut _;
471 self.level_ptrs[0] = Some(ptr);
472 } else if let Some(parent_ptr) = self.level_ptrs[stack_height - 2] {
473 (*parent_ptr).inner_instructions.push(new_instruction);
474 let ptr = (*parent_ptr)
475 .inner_instructions
476 .last_mut()
477 .unwrap_unchecked() as *mut _;
478 self.level_ptrs[stack_height - 1] = Some(ptr);
479 }
480 }
481 }
482
483 NestedInstructions(self.nested_ixs)
484 }
485}
486
487#[cfg(test)]
488mod tests {
489
490 use {
491 super::*, solana_instruction::Instruction, solana_transaction_status::TransactionStatusMeta,
492 };
493
494 fn create_instruction_with_metadata(
495 index: u32,
496 stack_height: u32,
497 absolute_path: Vec<u8>,
498 ) -> (InstructionMetadata, Instruction) {
499 let metadata = InstructionMetadata {
500 transaction_metadata: Arc::new(TransactionMetadata {
501 meta: TransactionStatusMeta {
502 log_messages: Some(vec!["Program CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK invoke [1]".to_string(), "Program data: QMbN6CYIceLh9Vdh3ndmrpChVVDCYAykCoHLEYdQWNcAxLJNu7nWNHiJzugda0JT2xgyBCWGtm7/oWjb/wT2kcbwA0JRUuwSV88ABSiDPpXudmLYK2jIBhqh3sTXxnR7WMgtjWsyqjga53NruXU9Dj/hyRRE/RQ9xCEh3052KbW6tbtNksNK4HIr+0wAAAAAAAAAAAAAAACz/t2FxQIAAAAAAAAAAAAAACdJpynsFrOoMAAAAAAAAAD4JhBoAxAAAAAAAAAAAAAAhC8BAA==".to_string(), "Program CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK consumed 91799 of 185765 compute units".to_string(), "Program CAMMCzo5YL8w4VFF8KVHrK22GGUsp5VTaW7grrKgrWqK success".to_string()]),
503 ..Default::default()
504 },
505 ..Default::default()
506 }),
507 stack_height,
508 index,
509 absolute_path,
510 };
511 let instruction = Instruction {
512 program_id: Pubkey::new_unique(),
513 accounts: vec![AccountMeta::new(Pubkey::new_unique(), false)],
514 data: vec![],
515 };
516 (metadata, instruction)
517 }
518
519 #[test]
520 fn test_nested_instructions_single_level() {
521 let instructions = vec![
522 create_instruction_with_metadata(1, 1, vec![1]),
523 create_instruction_with_metadata(2, 1, vec![2]),
524 ];
525 let nested_instructions: NestedInstructions = instructions.into();
526 assert_eq!(nested_instructions.len(), 2);
527 assert!(nested_instructions[0].inner_instructions.is_empty());
528 assert!(nested_instructions[1].inner_instructions.is_empty());
529 }
530
531 #[test]
532 fn test_nested_instructions_empty() {
533 let instructions: InstructionsWithMetadata = vec![];
534 let nested_instructions: NestedInstructions = instructions.into();
535 assert!(nested_instructions.is_empty());
536 }
537
538 #[test]
539 fn test_deep_nested_instructions() {
540 let instructions = vec![
541 create_instruction_with_metadata(0, 1, vec![0]),
542 create_instruction_with_metadata(0, 1, vec![0]),
543 create_instruction_with_metadata(1, 2, vec![0, 1]),
544 create_instruction_with_metadata(1, 3, vec![0, 1, 1]),
545 create_instruction_with_metadata(1, 3, vec![0, 1, 1]),
546 create_instruction_with_metadata(1, 3, vec![0, 1, 1]),
547 create_instruction_with_metadata(1, 3, vec![0, 1, 1]),
548 ];
549
550 let nested_instructions: NestedInstructions = instructions.into();
551 assert_eq!(nested_instructions.len(), 2);
552 assert_eq!(nested_instructions.0[1].inner_instructions.len(), 1);
553 }
554
555 #[test]
556 fn test_extract_event_log_data() {
557 let logs = create_instruction_with_metadata(0, 1, vec![0])
558 .0
559 .extract_event_log_data();
560 assert_eq!(logs.len(), 1);
561 assert_eq!(
562 logs[0],
563 base64::Engine::decode(
564 &base64::engine::general_purpose::STANDARD,
565 "QMbN6CYIceLh9Vdh3ndmrpChVVDCYAykCoHLEYdQWNcAxLJNu7nWNHiJzugda0JT2xgyBCWGtm7/oWjb/wT2kcbwA0JRUuwSV88ABSiDPpXudmLYK2jIBhqh3sTXxnR7WMgtjWsyqjga53NruXU9Dj/hyRRE/RQ9xCEh3052KbW6tbtNksNK4HIr+0wAAAAAAAAAAAAAAACz/t2FxQIAAAAAAAAAAAAAACdJpynsFrOoMAAAAAAAAAD4JhBoAxAAAAAAAAAAAAAAhC8BAA=="
566 )
567 .unwrap()
568 );
569 }
570}