Skip to main content

light_event/
parse.rs

1use borsh::BorshDeserialize;
2use light_compressed_account::{
3    compressed_account::{
4        CompressedAccount, CompressedAccountData, PackedCompressedAccountWithMerkleContext,
5    },
6    constants::{
7        ACCOUNT_COMPRESSION_PROGRAM_ID, CREATE_CPI_CONTEXT_ACCOUNT, LIGHT_REGISTRY_PROGRAM_ID,
8        LIGHT_SYSTEM_PROGRAM_ID, REGISTERED_PROGRAM_PDA,
9    },
10    discriminators::*,
11    instruction_data::{
12        data::{InstructionDataInvoke, OutputCompressedAccountWithPackedContext},
13        insert_into_queues::InsertIntoQueuesInstructionData,
14        with_account_info::InstructionDataInvokeCpiWithAccountInfo,
15        with_readonly::InstructionDataInvokeCpiWithReadOnly,
16    },
17    nullifier::create_nullifier,
18    Pubkey,
19};
20use light_token_interface::{
21    instructions::{
22        extensions::ExtensionInstructionData, transfer2::CompressedTokenInstructionDataTransfer2,
23    },
24    LIGHT_TOKEN_PROGRAM_ID, TRANSFER2,
25};
26use light_zero_copy::traits::ZeroCopyAt;
27
28use super::{
29    error::ParseIndexerEventError,
30    event::{
31        AssociatedTokenAccountOwnerInfo, BatchNullifyContext, BatchPublicTransactionEvent,
32        MerkleTreeSequenceNumber, MerkleTreeSequenceNumberV1, NewAddress, PublicTransactionEvent,
33    },
34};
35
36#[derive(Debug, Clone, PartialEq)]
37struct ExecutingSystemInstruction<'a> {
38    output_compressed_accounts: Vec<OutputCompressedAccountWithPackedContext>,
39    input_compressed_accounts: Vec<PackedCompressedAccountWithMerkleContext>,
40    is_compress: bool,
41    relay_fee: Option<u64>,
42    compress_or_decompress_lamports: Option<u64>,
43    execute_cpi_context: bool,
44    accounts: &'a [Pubkey],
45}
46
47#[derive(Debug, Clone, PartialEq, Default)]
48pub struct Indices {
49    pub system: usize,
50    pub cpi: Vec<usize>,
51    pub insert_into_queues: usize,
52    pub found_solana_system_program_instruction: bool,
53    pub found_system: bool,
54    /// Index of the token program instruction (if present, only when called from registry)
55    pub token: Option<usize>,
56    /// Whether registry program was found in the CPI chain (required for token instruction tracking)
57    pub found_registry: bool,
58}
59
60#[derive(Debug, Clone, Copy, PartialEq)]
61pub enum ProgramId {
62    LightSystem,
63    AccountCompression,
64    SolanaSystem,
65    LightToken,
66    Registry,
67    Unknown,
68}
69
70#[derive(Debug, Clone, PartialEq)]
71struct AssociatedInstructions<'a> {
72    pub executing_system_instruction: ExecutingSystemInstruction<'a>,
73    pub cpi_context_outputs: Vec<OutputCompressedAccountWithPackedContext>,
74    pub insert_into_queues_instruction: InsertIntoQueuesInstructionData<'a>,
75    pub accounts: &'a [Pubkey],
76    /// Token instruction data and accounts for ATA owner extraction
77    pub token_instruction: Option<TokenInstructionData<'a>>,
78}
79
80/// Parsed token instruction data for extracting ATA owner info
81#[derive(Debug, Clone, PartialEq)]
82pub struct TokenInstructionData<'a> {
83    /// Raw instruction data
84    pub data: &'a [u8],
85    /// Accounts for this instruction
86    pub accounts: &'a [Pubkey],
87}
88
89/// We piece the event together from 2 instructions:
90/// 1. light_system_program::{Invoke, InvokeCpi, InvokeCpiReadOnly} (one of the 3)
91/// 2. account_compression::InsertIntoQueues
92/// - We return new addresses in batched trees separately
93///   because from the PublicTransactionEvent there
94///   is no way to know which addresses are new and
95///   for batched address trees we need to index the queue of new addresses
96///   the tree&queue account only contains bloomfilters, roots and metadata.
97///
98/// Steps:
99/// 0. Wrap program ids of instructions to filter but not change the pattern
100///         system program cpi context creation ixs
101///         insert into queue ixs not by the system program
102///         instructions with less than 12 bytes ix data
103/// 1. Find associated instructions by cpi pattern.
104/// 2. Deserialize associated instructions.
105/// 3. Create batched transaction events.
106pub fn event_from_light_transaction(
107    program_ids: &[Pubkey],
108    instructions: &[Vec<u8>],
109    accounts: Vec<Vec<Pubkey>>,
110) -> Result<Option<Vec<BatchPublicTransactionEvent>>, ParseIndexerEventError> {
111    // 0. Wrap program ids of instructions to filter but not change the pattern.
112    let program_ids = wrap_program_ids(program_ids, instructions, &accounts);
113    // 1. Find associated instructions by cpi pattern.
114    let mut patterns = find_cpi_patterns(&program_ids);
115    if patterns.is_empty() {
116        return Ok(None);
117    }
118    // We searched from the last pattern to the first.
119    //      -> reverse to be in order
120    patterns.reverse();
121    // 2. Deserialize associated instructions.
122    let associated_instructions = patterns
123        .iter()
124        .map(|pattern| deserialize_associated_instructions(pattern, instructions, &accounts))
125        .collect::<Result<Vec<_>, _>>()?;
126    // 3. Create batched transaction events.
127    let batched_transaction_events = associated_instructions
128        .iter()
129        .map(|associated_instruction| create_batched_transaction_event(associated_instruction))
130        .collect::<Result<Vec<_>, _>>()?;
131
132    // // Sanity checks:
133    // // - this must not throw in production because indexing just works if all instructions are in the same transaction.
134    // // - It's ok if someone misues the cpi context account but transaction data will not be available in photon.
135    // // - if we would throw an error it would brick photon because we would not be able to index a transaction that changed queue state.
136    // // - I could add extra data to the account compression cpi to make this impossible. -> this makes sense it is more robust.
137    // // TODO: make debug
138    // batched_transaction_events.iter().for_each(|event| {
139    //     println!("event: {:?}", event);
140    //     assert_eq!(
141    //         event.event.input_compressed_account_hashes.len(),
142    //         event.batch_input_accounts.len(),
143    //         "Input hashes and input accounts length mismatch "
144    //     );
145    //     assert_eq!(
146    //         event.event.output_compressed_account_hashes.len(),
147    //         event.event.output_leaf_indices.len(),
148    //         "Output hashes and output leaf indices length mismatch "
149    //     );
150    //     assert_eq!(
151    //         event.event.output_compressed_account_hashes.len(),
152    //         event.event.output_compressed_accounts.len(),
153    //         "Output hashes and output compressed accounts length mismatch "
154    //     );
155    // });
156    Ok(Some(batched_transaction_events))
157}
158
159fn deserialize_associated_instructions<'a>(
160    indices: &Indices,
161    instructions: &'a [Vec<u8>],
162    accounts: &'a [Vec<Pubkey>],
163) -> Result<AssociatedInstructions<'a>, ParseIndexerEventError> {
164    let (insert_queues_instruction, cpi_context_outputs) = {
165        let ix = &instructions[indices.insert_into_queues];
166        if ix.len() < 12 {
167            return Err(ParseIndexerEventError::InstructionDataTooSmall(
168                ix.len(),
169                12,
170            ));
171        }
172        let discriminator: [u8; 8] = ix[0..8].try_into().unwrap();
173        if discriminator == DISCRIMINATOR_INSERT_INTO_QUEUES {
174            let (data, bytes) = InsertIntoQueuesInstructionData::zero_copy_at(&ix[12..])?;
175            let cpi_context_outputs =
176                Vec::<OutputCompressedAccountWithPackedContext>::deserialize(&mut &bytes[..])?;
177            Ok((data, cpi_context_outputs))
178        } else {
179            Err(ParseIndexerEventError::DeserializeAccountLightSystemCpiInputsError)
180        }
181    }?;
182    let exec_instruction =
183        deserialize_instruction(&instructions[indices.system], &accounts[indices.system])?;
184
185    // Get token instruction data if present
186    let token_instruction = indices.token.map(|token_idx| TokenInstructionData {
187        data: &instructions[token_idx],
188        accounts: &accounts[token_idx],
189    });
190
191    Ok(AssociatedInstructions {
192        executing_system_instruction: exec_instruction,
193        cpi_context_outputs,
194        insert_into_queues_instruction: insert_queues_instruction,
195        // Remove signer and register program accounts.
196        accounts: &accounts[indices.insert_into_queues][2..],
197        token_instruction,
198    })
199}
200
201/// Filter all system instructions which create cpi context accounts,
202/// so that we can infer that a system program instruction is a light transaction.
203/// Create new AssociatedInstructions when we find a system instruction
204/// if next instruct is solana system program isntruction followed by insert into queues is executable instruction
205/// else is cpi instruction
206/// only push into vec if insert into queues instruction is found
207pub fn find_cpi_patterns(program_ids: &[ProgramId]) -> Vec<Indices> {
208    let mut vec = Vec::new();
209    let mut next_index = usize::MAX;
210    for (last_index, program_id) in (0..program_ids.len()).rev().zip(program_ids.iter().rev()) {
211        // skip last found pattern
212        if last_index > next_index {
213            continue;
214        }
215        // In case that we encounter more than one account compression program ix
216        // before finding one or more system program ix we just overwrite.
217        if let ProgramId::AccountCompression = program_id {
218            let (res, last_index) = find_cpi_pattern(last_index, program_ids);
219            next_index = last_index;
220            if let Some(res) = res {
221                vec.push(res);
222            };
223        }
224    }
225    vec
226}
227
228/// Pattern, SYSTEM_PROGRAM_ID.., default ids .., account compression program id
229/// We search for the pattern in reverse because there can be multiple system instructions
230/// but only one account compression instruction.
231/// Start index points to ACCOUNT_COMPRESSION_PROGRAM_ID
232pub fn find_cpi_pattern(start_index: usize, program_ids: &[ProgramId]) -> (Option<Indices>, usize) {
233    let mut index_account = Indices {
234        insert_into_queues: start_index,
235        ..Default::default()
236    };
237    // Track tentative token index - will only be confirmed if registry is found
238    let mut tentative_token: Option<usize> = None;
239
240    for (index, program_id) in (0..start_index)
241        .rev()
242        .zip(program_ids[..start_index].iter().rev())
243    {
244        if let ProgramId::SolanaSystem = program_id {
245            index_account.found_solana_system_program_instruction = true;
246            continue;
247        } else if matches!(program_id, ProgramId::LightSystem)
248            && index_account.found_solana_system_program_instruction
249            && !index_account.found_system
250        {
251            index_account.system = index;
252            index_account.found_system = true;
253        } else if index_account.found_system && matches!(program_id, ProgramId::LightSystem) {
254            index_account.cpi.push(index);
255        } else if index_account.found_system && matches!(program_id, ProgramId::LightToken) {
256            // Token program Transfer2 instruction in the CPI chain.
257            // Track tentatively - will only be confirmed if registry is found later.
258            // Only track the first one (closest to system instruction).
259            if tentative_token.is_none() {
260                tentative_token = Some(index);
261            }
262        } else if index_account.found_system && matches!(program_id, ProgramId::Registry) {
263            // Registry program instruction - confirms token tracking for ATA owner extraction.
264            // Since we search backwards, registry is found after token in the search order,
265            // but registry is the outer caller in the actual CPI chain.
266            index_account.found_registry = true;
267            // Confirm the tentative token index now that we found registry
268            if index_account.token.is_none() {
269                index_account.token = tentative_token;
270            }
271        } else if matches!(program_id, ProgramId::AccountCompression) && index_account.found_system
272        {
273            // Possibly found next light transaction.
274            return (Some(index_account), index);
275        } else if !index_account.found_system {
276            // If no system program found pattern incomplete.
277            // Else search for cpi instructions until we find account compression program id.
278            return (None, index);
279        }
280    }
281    if index_account.found_system {
282        (Some(index_account), 0)
283    } else {
284        (None, 0)
285    }
286}
287
288pub fn wrap_program_ids(
289    program_ids: &[Pubkey],
290    instructions: &[Vec<u8>],
291    accounts: &[Vec<Pubkey>],
292) -> Vec<ProgramId> {
293    let mut vec = Vec::new();
294    for ((instruction, program_id), accounts) in instructions
295        .iter()
296        .zip(program_ids.iter())
297        .zip(accounts.iter())
298    {
299        if instruction.len() < 12 {
300            vec.push(ProgramId::Unknown);
301            continue;
302        }
303        let discriminator: [u8; 8] = instruction[0..8].try_into().unwrap();
304        if program_id == &Pubkey::default() {
305            vec.push(ProgramId::SolanaSystem);
306        } else if program_id == &LIGHT_SYSTEM_PROGRAM_ID {
307            if discriminator == CREATE_CPI_CONTEXT_ACCOUNT {
308                vec.push(ProgramId::Unknown);
309            } else {
310                vec.push(ProgramId::LightSystem);
311            }
312        } else if program_id == &ACCOUNT_COMPRESSION_PROGRAM_ID {
313            if discriminator == DISCRIMINATOR_INSERT_INTO_QUEUES
314                && accounts.len() > 2
315                && accounts[1] == REGISTERED_PROGRAM_PDA
316            {
317                vec.push(ProgramId::AccountCompression);
318            } else {
319                vec.push(ProgramId::Unknown);
320            }
321        } else if program_id == &Pubkey::from(LIGHT_TOKEN_PROGRAM_ID) {
322            // Token program Transfer2 instruction
323            if !instruction.is_empty() && instruction[0] == TRANSFER2 {
324                vec.push(ProgramId::LightToken);
325            } else {
326                vec.push(ProgramId::Unknown);
327            }
328        } else if program_id == &LIGHT_REGISTRY_PROGRAM_ID {
329            vec.push(ProgramId::Registry);
330        } else {
331            vec.push(ProgramId::Unknown);
332        }
333    }
334    vec
335}
336
337fn deserialize_instruction<'a>(
338    instruction: &'a [u8],
339    accounts: &'a [Pubkey],
340) -> Result<ExecutingSystemInstruction<'a>, ParseIndexerEventError> {
341    if instruction.len() < 12 {
342        return Err(ParseIndexerEventError::InstructionDataTooSmall(
343            instruction.len(),
344            12,
345        ));
346    }
347    let instruction_discriminator = instruction[0..8].try_into().unwrap();
348    let instruction = instruction.split_at(8).1;
349    match instruction_discriminator {
350        // Cannot be exucted with cpi context -> executing tx
351        DISCRIMINATOR_INVOKE => {
352            if accounts.len() < 9 {
353                return Err(ParseIndexerEventError::DeserializeSystemInstructionError);
354            }
355            let accounts = accounts.split_at(9).1;
356            // Skips vec size bytes
357            let data = InstructionDataInvoke::deserialize(&mut &instruction[4..])?;
358            Ok(ExecutingSystemInstruction {
359                output_compressed_accounts: data.output_compressed_accounts,
360                input_compressed_accounts: data.input_compressed_accounts_with_merkle_context,
361                is_compress: data.is_compress,
362                relay_fee: data.relay_fee,
363                compress_or_decompress_lamports: data.compress_or_decompress_lamports,
364                execute_cpi_context: false,
365                accounts,
366            })
367        }
368        DISCRIMINATOR_INVOKE_CPI => {
369            if accounts.len() < 11 {
370                return Err(ParseIndexerEventError::DeserializeSystemInstructionError);
371            }
372            let accounts = accounts.split_at(11).1;
373            let data = light_compressed_account::instruction_data::invoke_cpi::InstructionDataInvokeCpi::deserialize(
374                &mut &instruction[4..],
375            )?;
376            Ok(ExecutingSystemInstruction {
377                output_compressed_accounts: data.output_compressed_accounts,
378                input_compressed_accounts: data.input_compressed_accounts_with_merkle_context,
379                is_compress: data.is_compress,
380                relay_fee: data.relay_fee,
381                compress_or_decompress_lamports: data.compress_or_decompress_lamports,
382                execute_cpi_context: data.cpi_context.is_some(),
383                accounts,
384            })
385        }
386        DISCRIMINATOR_INVOKE_CPI_WITH_READ_ONLY => {
387            // Min len for a small instruction 3 accounts + 1 tree or queue
388            // Fee payer + authority + registered program + account compression program + account compression authority
389            if accounts.len() < 5 {
390                return Err(ParseIndexerEventError::DeserializeSystemInstructionError);
391            }
392            let data: InstructionDataInvokeCpiWithReadOnly =
393                InstructionDataInvokeCpiWithReadOnly::deserialize(&mut &instruction[..])?;
394            let system_accounts_len = if data.mode == 0 {
395                11
396            } else {
397                let mut len = 6; // fee_payer + authority + registered_program + account_compression_program + account_compression_authority + system_program
398                if data.compress_or_decompress_lamports > 0 {
399                    len += 1;
400                }
401                if !data.is_compress && data.compress_or_decompress_lamports > 0 {
402                    len += 1;
403                }
404                if data.with_cpi_context {
405                    len += 1;
406                }
407                len
408            };
409
410            let accounts = accounts.split_at(system_accounts_len).1;
411            Ok(ExecutingSystemInstruction {
412                output_compressed_accounts: data.output_compressed_accounts,
413                input_compressed_accounts: data
414                    .input_compressed_accounts
415                    .iter()
416                    .map(|x| {
417                        x.into_packed_compressed_account_with_merkle_context(
418                            data.invoking_program_id,
419                        )
420                    })
421                    .collect::<Vec<_>>(),
422                is_compress: data.is_compress && data.compress_or_decompress_lamports > 0,
423                relay_fee: None,
424                compress_or_decompress_lamports: if data.compress_or_decompress_lamports == 0 {
425                    None
426                } else {
427                    Some(data.compress_or_decompress_lamports)
428                },
429                execute_cpi_context: data.with_cpi_context,
430                accounts,
431            })
432        }
433        INVOKE_CPI_WITH_ACCOUNT_INFO_INSTRUCTION => {
434            // Min len for a small instruction 4 accounts + 1 tree or queue
435            // Fee payer + authority + registered program + account compression program + account compression authority
436            if accounts.len() < 5 {
437                return Err(ParseIndexerEventError::DeserializeSystemInstructionError);
438            }
439            let data: InstructionDataInvokeCpiWithAccountInfo =
440                InstructionDataInvokeCpiWithAccountInfo::deserialize(&mut &instruction[..])?;
441            let system_accounts_len = if data.mode == 0 {
442                11
443            } else {
444                let mut len = 6; // fee_payer + authority + registered_program + account_compression_program + account_compression_authority + system_program
445                if data.compress_or_decompress_lamports > 0 {
446                    len += 1;
447                }
448                if !data.is_compress && data.compress_or_decompress_lamports > 0 {
449                    len += 1;
450                }
451                if data.with_cpi_context {
452                    len += 1;
453                }
454                len
455            };
456            let accounts = accounts.split_at(system_accounts_len).1;
457
458            let instruction = ExecutingSystemInstruction {
459                output_compressed_accounts: data
460                    .account_infos
461                    .iter()
462                    .filter(|x| x.output.is_some())
463                    .map(|x| {
464                        let account = x.output.as_ref().unwrap();
465                        OutputCompressedAccountWithPackedContext {
466                            compressed_account: CompressedAccount {
467                                address: x.address,
468                                owner: data.invoking_program_id,
469                                lamports: account.lamports,
470                                data: Some(CompressedAccountData {
471                                    discriminator: account.discriminator,
472                                    data: account.data.clone(),
473                                    data_hash: account.data_hash,
474                                }),
475                            },
476                            merkle_tree_index: account.output_merkle_tree_index,
477                        }
478                    })
479                    .collect::<Vec<_>>(),
480                input_compressed_accounts: data
481                    .account_infos
482                    .iter()
483                    .filter(|x| x.input.is_some())
484                    .map(|x| {
485                        let account = x.input.as_ref().unwrap();
486                        PackedCompressedAccountWithMerkleContext {
487                            compressed_account: CompressedAccount {
488                                address: x.address,
489                                owner: data.invoking_program_id,
490                                lamports: account.lamports,
491                                data: Some(CompressedAccountData {
492                                    discriminator: account.discriminator,
493                                    data: vec![],
494                                    data_hash: account.data_hash,
495                                }),
496                            },
497                            read_only: false,
498                            root_index: account.root_index,
499                            merkle_context: account.merkle_context,
500                        }
501                    })
502                    .collect::<Vec<_>>(),
503                is_compress: data.is_compress && data.compress_or_decompress_lamports > 0,
504                relay_fee: None,
505                compress_or_decompress_lamports: if data.compress_or_decompress_lamports == 0 {
506                    None
507                } else {
508                    Some(data.compress_or_decompress_lamports)
509                },
510                execute_cpi_context: data.with_cpi_context,
511                accounts,
512            };
513
514            Ok(instruction)
515        }
516        _ => Err(ParseIndexerEventError::DeserializeSystemInstructionError),
517    }
518}
519
520/// Extract ATA owner info from token instruction's out_tlv.
521/// Returns a Vec of (output_index, wallet_owner) for ATAs.
522pub fn extract_ata_owners(
523    token_instruction: &TokenInstructionData,
524) -> Vec<AssociatedTokenAccountOwnerInfo> {
525    let mut ata_owners = Vec::new();
526
527    // Token instruction format: [discriminator (1 byte)] [serialized data]
528    if token_instruction.data.is_empty() || token_instruction.data[0] != TRANSFER2 {
529        return ata_owners;
530    }
531
532    // Skip discriminator byte and deserialize using borsh
533    let data = &token_instruction.data[1..];
534    let Ok(transfer_data) = CompressedTokenInstructionDataTransfer2::deserialize(&mut &data[..])
535    else {
536        return ata_owners;
537    };
538
539    // Check if there's out_tlv data
540    let Some(out_tlv) = transfer_data.out_tlv.as_ref() else {
541        return ata_owners;
542    };
543
544    // Iterate over output TLV entries (one per output token account)
545    for (output_index, tlv_extensions) in out_tlv.iter().enumerate() {
546        // Look for CompressedOnly extension with is_ata=true
547        for ext in tlv_extensions.iter() {
548            if let ExtensionInstructionData::CompressedOnly(compressed_only) = ext {
549                if compressed_only.is_ata {
550                    // Get wallet owner from packed_accounts using owner_index.
551                    // owner_index is an index into packed_accounts, which starts at position 7
552                    // in the Transfer2 accounts array (after the 7 system accounts).
553                    const TRANSFER2_PACKED_ACCOUNTS_OFFSET: usize = 7;
554                    let owner_idx =
555                        compressed_only.owner_index as usize + TRANSFER2_PACKED_ACCOUNTS_OFFSET;
556                    if owner_idx < token_instruction.accounts.len() {
557                        ata_owners.push(AssociatedTokenAccountOwnerInfo {
558                            output_index: output_index as u8,
559                            wallet_owner: token_instruction.accounts[owner_idx],
560                        });
561                    }
562                }
563            }
564        }
565    }
566
567    ata_owners
568}
569
570fn create_batched_transaction_event(
571    associated_instructions: &AssociatedInstructions,
572) -> Result<BatchPublicTransactionEvent, ParseIndexerEventError> {
573    let input_sequence_numbers = associated_instructions
574        .insert_into_queues_instruction
575        .input_sequence_numbers
576        .iter()
577        .map(From::from)
578        .filter(|x: &MerkleTreeSequenceNumber| !(*x).is_empty())
579        .collect::<Vec<MerkleTreeSequenceNumber>>();
580    let mut batched_transaction_event = BatchPublicTransactionEvent {
581        event: PublicTransactionEvent {
582            input_compressed_account_hashes: associated_instructions
583                .insert_into_queues_instruction
584                .nullifiers
585                .iter()
586                .map(|x| x.account_hash)
587                .collect(),
588            output_compressed_account_hashes: associated_instructions
589                .insert_into_queues_instruction
590                .leaves
591                .iter()
592                .map(|x| x.leaf)
593                .collect(),
594            output_compressed_accounts: [
595                associated_instructions.cpi_context_outputs.clone(),
596                associated_instructions
597                    .executing_system_instruction
598                    .output_compressed_accounts
599                    .clone(),
600            ]
601            .concat(),
602            output_leaf_indices: associated_instructions
603                .insert_into_queues_instruction
604                .output_leaf_indices
605                .iter()
606                .map(|x| u32::from(*x))
607                .collect(),
608            sequence_numbers: associated_instructions
609                .insert_into_queues_instruction
610                .output_sequence_numbers
611                .iter()
612                .map(From::from)
613                .filter(|x: &MerkleTreeSequenceNumber| !(*x).is_empty())
614                .map(|x| MerkleTreeSequenceNumberV1 {
615                    seq: x.seq,
616                    tree_pubkey: x.tree_pubkey,
617                })
618                .collect(),
619            relay_fee: associated_instructions
620                .executing_system_instruction
621                .relay_fee,
622            is_compress: associated_instructions
623                .executing_system_instruction
624                .is_compress,
625            compress_or_decompress_lamports: associated_instructions
626                .executing_system_instruction
627                .compress_or_decompress_lamports,
628            pubkey_array: associated_instructions
629                .executing_system_instruction
630                .accounts
631                .to_vec(),
632            message: None,
633            ata_owners: associated_instructions
634                .token_instruction
635                .as_ref()
636                .map(extract_ata_owners)
637                .unwrap_or_default(),
638        },
639        tx_hash: associated_instructions
640            .insert_into_queues_instruction
641            .tx_hash,
642        new_addresses: associated_instructions
643            .insert_into_queues_instruction
644            .addresses
645            .iter()
646            .map(|x| NewAddress {
647                address: x.address,
648                mt_pubkey: associated_instructions.accounts[x.tree_index as usize],
649                queue_index: u64::MAX,
650            })
651            .collect::<Vec<_>>(),
652        address_sequence_numbers: associated_instructions
653            .insert_into_queues_instruction
654            .address_sequence_numbers
655            .iter()
656            .map(From::from)
657            .filter(|x: &MerkleTreeSequenceNumber| !(*x).is_empty())
658            .collect::<Vec<MerkleTreeSequenceNumber>>(),
659        batch_input_accounts: associated_instructions
660            .insert_into_queues_instruction
661            .nullifiers
662            .iter()
663            .filter(|x| {
664                input_sequence_numbers.iter().any(|y| {
665                    y.tree_pubkey == associated_instructions.accounts[x.tree_index as usize]
666                })
667            })
668            .map(|n| {
669                Ok(BatchNullifyContext {
670                    tx_hash: associated_instructions
671                        .insert_into_queues_instruction
672                        .tx_hash,
673                    account_hash: n.account_hash,
674                    nullifier: {
675                        // The nullifier is computed inside the account compression program.
676                        // -> it is not part of the cpi system to account compression program that we index.
677                        // -> we need to compute the nullifier here.
678                        create_nullifier(
679                            &n.account_hash,
680                            n.leaf_index.into(),
681                            &associated_instructions
682                                .insert_into_queues_instruction
683                                .tx_hash,
684                        )?
685                    },
686                    nullifier_queue_index: u64::MAX,
687                })
688            })
689            .collect::<Result<Vec<_>, ParseIndexerEventError>>()?,
690        input_sequence_numbers,
691    };
692
693    let nullifier_queue_indices = create_nullifier_queue_indices(
694        associated_instructions,
695        batched_transaction_event.batch_input_accounts.len(),
696    );
697
698    batched_transaction_event
699        .batch_input_accounts
700        .iter_mut()
701        .zip(nullifier_queue_indices.iter())
702        .for_each(|(context, index)| {
703            context.nullifier_queue_index = *index;
704        });
705
706    let address_queue_indices = create_address_queue_indices(
707        associated_instructions,
708        batched_transaction_event.new_addresses.len(),
709    );
710
711    batched_transaction_event
712        .new_addresses
713        .iter_mut()
714        .zip(address_queue_indices.iter())
715        .for_each(|(context, index)| {
716            context.queue_index = *index;
717        });
718
719    Ok(batched_transaction_event)
720}
721
722fn create_nullifier_queue_indices(
723    associated_instructions: &AssociatedInstructions,
724    len: usize,
725) -> Vec<u64> {
726    let input_merkle_tree_pubkeys = associated_instructions
727        .executing_system_instruction
728        .input_compressed_accounts
729        .iter()
730        .map(|x| {
731            associated_instructions
732                .executing_system_instruction
733                .accounts[x.merkle_context.merkle_tree_pubkey_index as usize]
734        })
735        .collect::<Vec<_>>();
736    let mut nullifier_queue_indices = vec![u64::MAX; len];
737    let mut internal_input_sequence_numbers = associated_instructions
738        .insert_into_queues_instruction
739        .input_sequence_numbers
740        .to_vec();
741    // For every sequence number:
742    // 1. Find every input compressed account
743    // 2. assign sequence number as nullifier queue index
744    // 3. increment the sequence number
745    internal_input_sequence_numbers.iter_mut().for_each(|seq| {
746        for (i, merkle_tree_pubkey) in input_merkle_tree_pubkeys.iter().enumerate() {
747            if *merkle_tree_pubkey == seq.tree_pubkey {
748                nullifier_queue_indices[i] = seq.seq.into();
749                seq.seq += 1;
750            }
751        }
752    });
753    nullifier_queue_indices
754}
755
756fn create_address_queue_indices(
757    associated_instructions: &AssociatedInstructions,
758    len: usize,
759) -> Vec<u64> {
760    let address_merkle_tree_pubkeys = associated_instructions
761        .insert_into_queues_instruction
762        .addresses
763        .iter()
764        .map(|x| associated_instructions.accounts[x.tree_index as usize])
765        .collect::<Vec<_>>();
766    let mut address_queue_indices = vec![u64::MAX; len];
767    let mut internal_address_sequence_numbers = associated_instructions
768        .insert_into_queues_instruction
769        .address_sequence_numbers
770        .to_vec();
771    internal_address_sequence_numbers
772        .iter_mut()
773        .for_each(|seq| {
774            for (i, merkle_tree_pubkey) in address_merkle_tree_pubkeys.iter().enumerate() {
775                if *merkle_tree_pubkey == seq.tree_pubkey {
776                    address_queue_indices[i] = seq.seq.into();
777                    seq.seq += 1;
778                }
779            }
780        });
781    address_queue_indices
782}