Skip to main content

light_event/
parse.rs

1use borsh::BorshDeserialize;
2use light_sdk_types::lca::{
3    compressed_account::{
4        CompressedAccount, CompressedAccountData, PackedCompressedAccountWithMerkleContext,
5    },
6    constants::{
7        ACCOUNT_COMPRESSION_PROGRAM_ID, CREATE_CPI_CONTEXT_ACCOUNT, LIGHT_REGISTRY_PROGRAM_ID,
8        LIGHT_SYSTEM_PROGRAM_ID, REGISTERED_PROGRAM_PDA,
9    },
10    discriminators::*,
11    instruction_data::{
12        data::{InstructionDataInvoke, OutputCompressedAccountWithPackedContext},
13        insert_into_queues::InsertIntoQueuesInstructionData,
14        with_account_info::InstructionDataInvokeCpiWithAccountInfo,
15        with_readonly::InstructionDataInvokeCpiWithReadOnly,
16    },
17    nullifier::create_nullifier,
18    Pubkey,
19};
20use light_sdk_types::LIGHT_TOKEN_PROGRAM_ID;
21use light_zero_copy::traits::ZeroCopyAt;
22
23/// Transfer2 instruction discriminator for the Light Token program.
24const TRANSFER2: u8 = 101;
25
26use super::{
27    error::ParseIndexerEventError,
28    event::{
29        BatchNullifyContext, BatchPublicTransactionEvent, MerkleTreeSequenceNumber,
30        MerkleTreeSequenceNumberV1, NewAddress, PublicTransactionEvent,
31    },
32};
33
34#[derive(Debug, Clone, PartialEq)]
35struct ExecutingSystemInstruction<'a> {
36    output_compressed_accounts: Vec<OutputCompressedAccountWithPackedContext>,
37    input_compressed_accounts: Vec<PackedCompressedAccountWithMerkleContext>,
38    is_compress: bool,
39    relay_fee: Option<u64>,
40    compress_or_decompress_lamports: Option<u64>,
41    execute_cpi_context: bool,
42    accounts: &'a [Pubkey],
43}
44
45#[derive(Debug, Clone, PartialEq, Default)]
46pub struct Indices {
47    pub system: usize,
48    pub cpi: Vec<usize>,
49    pub insert_into_queues: usize,
50    pub found_solana_system_program_instruction: bool,
51    pub found_system: bool,
52    /// Index of the token program instruction (if present, only when called from registry)
53    pub token: Option<usize>,
54    /// Whether registry program was found in the CPI chain (required for token instruction tracking)
55    pub found_registry: bool,
56}
57
58#[derive(Debug, Clone, Copy, PartialEq)]
59pub enum ProgramId {
60    LightSystem,
61    AccountCompression,
62    SolanaSystem,
63    LightToken,
64    Registry,
65    Unknown,
66}
67
68#[derive(Debug, Clone, PartialEq)]
69struct AssociatedInstructions<'a> {
70    pub executing_system_instruction: ExecutingSystemInstruction<'a>,
71    pub cpi_context_outputs: Vec<OutputCompressedAccountWithPackedContext>,
72    pub insert_into_queues_instruction: InsertIntoQueuesInstructionData<'a>,
73    pub accounts: &'a [Pubkey],
74}
75
76/// We piece the event together from 2 instructions:
77/// 1. light_system_program::{Invoke, InvokeCpi, InvokeCpiReadOnly} (one of the 3)
78/// 2. account_compression::InsertIntoQueues
79/// - We return new addresses in batched trees separately
80///   because from the PublicTransactionEvent there
81///   is no way to know which addresses are new and
82///   for batched address trees we need to index the queue of new addresses
83///   the tree&queue account only contains bloomfilters, roots and metadata.
84///
85/// Steps:
86/// 0. Wrap program ids of instructions to filter but not change the pattern
87///         system program cpi context creation ixs
88///         insert into queue ixs not by the system program
89///         instructions with less than 12 bytes ix data
90/// 1. Find associated instructions by cpi pattern.
91/// 2. Deserialize associated instructions.
92/// 3. Create batched transaction events.
93pub fn event_from_light_transaction(
94    program_ids: &[Pubkey],
95    instructions: &[Vec<u8>],
96    accounts: Vec<Vec<Pubkey>>,
97) -> Result<Option<Vec<BatchPublicTransactionEvent>>, ParseIndexerEventError> {
98    // 0. Wrap program ids of instructions to filter but not change the pattern.
99    let program_ids = wrap_program_ids(program_ids, instructions, &accounts);
100    // 1. Find associated instructions by cpi pattern.
101    let mut patterns = find_cpi_patterns(&program_ids);
102    if patterns.is_empty() {
103        return Ok(None);
104    }
105    // We searched from the last pattern to the first.
106    //      -> reverse to be in order
107    patterns.reverse();
108    // 2. Deserialize associated instructions.
109    let associated_instructions = patterns
110        .iter()
111        .map(|pattern| deserialize_associated_instructions(pattern, instructions, &accounts))
112        .collect::<Result<Vec<_>, _>>()?;
113    // 3. Create batched transaction events.
114    let batched_transaction_events = associated_instructions
115        .iter()
116        .map(|associated_instruction| create_batched_transaction_event(associated_instruction))
117        .collect::<Result<Vec<_>, _>>()?;
118
119    // // Sanity checks:
120    // // - this must not throw in production because indexing just works if all instructions are in the same transaction.
121    // // - It's ok if someone misues the cpi context account but transaction data will not be available in photon.
122    // // - if we would throw an error it would brick photon because we would not be able to index a transaction that changed queue state.
123    // // - I could add extra data to the account compression cpi to make this impossible. -> this makes sense it is more robust.
124    // // TODO: make debug
125    // batched_transaction_events.iter().for_each(|event| {
126    //     println!("event: {:?}", event);
127    //     assert_eq!(
128    //         event.event.input_compressed_account_hashes.len(),
129    //         event.batch_input_accounts.len(),
130    //         "Input hashes and input accounts length mismatch "
131    //     );
132    //     assert_eq!(
133    //         event.event.output_compressed_account_hashes.len(),
134    //         event.event.output_leaf_indices.len(),
135    //         "Output hashes and output leaf indices length mismatch "
136    //     );
137    //     assert_eq!(
138    //         event.event.output_compressed_account_hashes.len(),
139    //         event.event.output_compressed_accounts.len(),
140    //         "Output hashes and output compressed accounts length mismatch "
141    //     );
142    // });
143    Ok(Some(batched_transaction_events))
144}
145
146fn deserialize_associated_instructions<'a>(
147    indices: &Indices,
148    instructions: &'a [Vec<u8>],
149    accounts: &'a [Vec<Pubkey>],
150) -> Result<AssociatedInstructions<'a>, ParseIndexerEventError> {
151    let (insert_queues_instruction, cpi_context_outputs) = {
152        let ix = &instructions[indices.insert_into_queues];
153        if ix.len() < 12 {
154            return Err(ParseIndexerEventError::InstructionDataTooSmall(
155                ix.len(),
156                12,
157            ));
158        }
159        let discriminator: [u8; 8] = ix[0..8].try_into().unwrap();
160        if discriminator == DISCRIMINATOR_INSERT_INTO_QUEUES {
161            let (data, bytes) = InsertIntoQueuesInstructionData::zero_copy_at(&ix[12..])?;
162            let cpi_context_outputs =
163                Vec::<OutputCompressedAccountWithPackedContext>::deserialize(&mut &bytes[..])?;
164            Ok((data, cpi_context_outputs))
165        } else {
166            Err(ParseIndexerEventError::DeserializeAccountLightSystemCpiInputsError)
167        }
168    }?;
169    let exec_instruction =
170        deserialize_instruction(&instructions[indices.system], &accounts[indices.system])?;
171
172    Ok(AssociatedInstructions {
173        executing_system_instruction: exec_instruction,
174        cpi_context_outputs,
175        insert_into_queues_instruction: insert_queues_instruction,
176        // Remove signer and register program accounts.
177        accounts: &accounts[indices.insert_into_queues][2..],
178    })
179}
180
181/// Filter all system instructions which create cpi context accounts,
182/// so that we can infer that a system program instruction is a light transaction.
183/// Create new AssociatedInstructions when we find a system instruction
184/// if next instruct is solana system program isntruction followed by insert into queues is executable instruction
185/// else is cpi instruction
186/// only push into vec if insert into queues instruction is found
187pub fn find_cpi_patterns(program_ids: &[ProgramId]) -> Vec<Indices> {
188    let mut vec = Vec::new();
189    let mut next_index = usize::MAX;
190    for (last_index, program_id) in (0..program_ids.len()).rev().zip(program_ids.iter().rev()) {
191        // skip last found pattern
192        if last_index > next_index {
193            continue;
194        }
195        // In case that we encounter more than one account compression program ix
196        // before finding one or more system program ix we just overwrite.
197        if let ProgramId::AccountCompression = program_id {
198            let (res, last_index) = find_cpi_pattern(last_index, program_ids);
199            next_index = last_index;
200            if let Some(res) = res {
201                vec.push(res);
202            };
203        }
204    }
205    vec
206}
207
208/// Pattern, SYSTEM_PROGRAM_ID.., default ids .., account compression program id
209/// We search for the pattern in reverse because there can be multiple system instructions
210/// but only one account compression instruction.
211/// Start index points to ACCOUNT_COMPRESSION_PROGRAM_ID
212pub fn find_cpi_pattern(start_index: usize, program_ids: &[ProgramId]) -> (Option<Indices>, usize) {
213    let mut index_account = Indices {
214        insert_into_queues: start_index,
215        ..Default::default()
216    };
217    // Track tentative token index - will only be confirmed if registry is found
218    let mut tentative_token: Option<usize> = None;
219
220    for (index, program_id) in (0..start_index)
221        .rev()
222        .zip(program_ids[..start_index].iter().rev())
223    {
224        if let ProgramId::SolanaSystem = program_id {
225            index_account.found_solana_system_program_instruction = true;
226            continue;
227        } else if matches!(program_id, ProgramId::LightSystem)
228            && index_account.found_solana_system_program_instruction
229            && !index_account.found_system
230        {
231            index_account.system = index;
232            index_account.found_system = true;
233        } else if index_account.found_system && matches!(program_id, ProgramId::LightSystem) {
234            index_account.cpi.push(index);
235        } else if index_account.found_system && matches!(program_id, ProgramId::LightToken) {
236            // Token program Transfer2 instruction in the CPI chain.
237            // Track tentatively - will only be confirmed if registry is found later.
238            // Only track the first one (closest to system instruction).
239            if tentative_token.is_none() {
240                tentative_token = Some(index);
241            }
242        } else if index_account.found_system && matches!(program_id, ProgramId::Registry) {
243            // Registry program instruction - confirms token tracking for ATA owner extraction.
244            // Since we search backwards, registry is found after token in the search order,
245            // but registry is the outer caller in the actual CPI chain.
246            index_account.found_registry = true;
247            // Confirm the tentative token index now that we found registry
248            if index_account.token.is_none() {
249                index_account.token = tentative_token;
250            }
251        } else if matches!(program_id, ProgramId::AccountCompression) && index_account.found_system
252        {
253            // Possibly found next light transaction.
254            return (Some(index_account), index);
255        } else if !index_account.found_system {
256            // If no system program found pattern incomplete.
257            // Else search for cpi instructions until we find account compression program id.
258            return (None, index);
259        }
260    }
261    if index_account.found_system {
262        (Some(index_account), 0)
263    } else {
264        (None, 0)
265    }
266}
267
268pub fn wrap_program_ids(
269    program_ids: &[Pubkey],
270    instructions: &[Vec<u8>],
271    accounts: &[Vec<Pubkey>],
272) -> Vec<ProgramId> {
273    let mut vec = Vec::new();
274    for ((instruction, program_id), accounts) in instructions
275        .iter()
276        .zip(program_ids.iter())
277        .zip(accounts.iter())
278    {
279        if instruction.len() < 12 {
280            vec.push(ProgramId::Unknown);
281            continue;
282        }
283        let discriminator: [u8; 8] = instruction[0..8].try_into().unwrap();
284        if program_id == &Pubkey::default() {
285            vec.push(ProgramId::SolanaSystem);
286        } else if program_id == &Pubkey::from(LIGHT_SYSTEM_PROGRAM_ID) {
287            if discriminator == CREATE_CPI_CONTEXT_ACCOUNT {
288                vec.push(ProgramId::Unknown);
289            } else {
290                vec.push(ProgramId::LightSystem);
291            }
292        } else if program_id == &Pubkey::from(ACCOUNT_COMPRESSION_PROGRAM_ID) {
293            if discriminator == DISCRIMINATOR_INSERT_INTO_QUEUES
294                && accounts.len() > 2
295                && accounts[1] == Pubkey::from(REGISTERED_PROGRAM_PDA)
296            {
297                vec.push(ProgramId::AccountCompression);
298            } else {
299                vec.push(ProgramId::Unknown);
300            }
301        } else if program_id == &Pubkey::from(LIGHT_TOKEN_PROGRAM_ID) {
302            // Token program Transfer2 instruction
303            if !instruction.is_empty() && instruction[0] == TRANSFER2 {
304                vec.push(ProgramId::LightToken);
305            } else {
306                vec.push(ProgramId::Unknown);
307            }
308        } else if program_id == &Pubkey::from(LIGHT_REGISTRY_PROGRAM_ID) {
309            vec.push(ProgramId::Registry);
310        } else {
311            vec.push(ProgramId::Unknown);
312        }
313    }
314    vec
315}
316
317fn deserialize_instruction<'a>(
318    instruction: &'a [u8],
319    accounts: &'a [Pubkey],
320) -> Result<ExecutingSystemInstruction<'a>, ParseIndexerEventError> {
321    if instruction.len() < 12 {
322        return Err(ParseIndexerEventError::InstructionDataTooSmall(
323            instruction.len(),
324            12,
325        ));
326    }
327    let instruction_discriminator = instruction[0..8].try_into().unwrap();
328    let instruction = instruction.split_at(8).1;
329    match instruction_discriminator {
330        // Cannot be exucted with cpi context -> executing tx
331        DISCRIMINATOR_INVOKE => {
332            if accounts.len() < 9 {
333                return Err(ParseIndexerEventError::DeserializeSystemInstructionError);
334            }
335            let accounts = accounts.split_at(9).1;
336            // Skips vec size bytes
337            let data = InstructionDataInvoke::deserialize(&mut &instruction[4..])?;
338            Ok(ExecutingSystemInstruction {
339                output_compressed_accounts: data.output_compressed_accounts,
340                input_compressed_accounts: data.input_compressed_accounts_with_merkle_context,
341                is_compress: data.is_compress,
342                relay_fee: data.relay_fee,
343                compress_or_decompress_lamports: data.compress_or_decompress_lamports,
344                execute_cpi_context: false,
345                accounts,
346            })
347        }
348        DISCRIMINATOR_INVOKE_CPI => {
349            if accounts.len() < 11 {
350                return Err(ParseIndexerEventError::DeserializeSystemInstructionError);
351            }
352            let accounts = accounts.split_at(11).1;
353            let data = light_sdk_types::lca::instruction_data::invoke_cpi::InstructionDataInvokeCpi::deserialize(
354                &mut &instruction[4..],
355            )?;
356            Ok(ExecutingSystemInstruction {
357                output_compressed_accounts: data.output_compressed_accounts,
358                input_compressed_accounts: data.input_compressed_accounts_with_merkle_context,
359                is_compress: data.is_compress,
360                relay_fee: data.relay_fee,
361                compress_or_decompress_lamports: data.compress_or_decompress_lamports,
362                execute_cpi_context: data.cpi_context.is_some(),
363                accounts,
364            })
365        }
366        DISCRIMINATOR_INVOKE_CPI_WITH_READ_ONLY => {
367            // Min len for a small instruction 3 accounts + 1 tree or queue
368            // Fee payer + authority + registered program + account compression program + account compression authority
369            if accounts.len() < 5 {
370                return Err(ParseIndexerEventError::DeserializeSystemInstructionError);
371            }
372            let data: InstructionDataInvokeCpiWithReadOnly =
373                InstructionDataInvokeCpiWithReadOnly::deserialize(&mut &instruction[..])?;
374            let system_accounts_len = if data.mode == 0 {
375                11
376            } else {
377                let mut len = 6; // fee_payer + authority + registered_program + account_compression_program + account_compression_authority + system_program
378                if data.compress_or_decompress_lamports > 0 {
379                    len += 1;
380                }
381                if !data.is_compress && data.compress_or_decompress_lamports > 0 {
382                    len += 1;
383                }
384                if data.with_cpi_context {
385                    len += 1;
386                }
387                len
388            };
389
390            let accounts = accounts.split_at(system_accounts_len).1;
391            Ok(ExecutingSystemInstruction {
392                output_compressed_accounts: data.output_compressed_accounts,
393                input_compressed_accounts: data
394                    .input_compressed_accounts
395                    .iter()
396                    .map(|x| {
397                        x.into_packed_compressed_account_with_merkle_context(
398                            data.invoking_program_id,
399                        )
400                    })
401                    .collect::<Vec<_>>(),
402                is_compress: data.is_compress && data.compress_or_decompress_lamports > 0,
403                relay_fee: None,
404                compress_or_decompress_lamports: if data.compress_or_decompress_lamports == 0 {
405                    None
406                } else {
407                    Some(data.compress_or_decompress_lamports)
408                },
409                execute_cpi_context: data.with_cpi_context,
410                accounts,
411            })
412        }
413        INVOKE_CPI_WITH_ACCOUNT_INFO_INSTRUCTION => {
414            // Min len for a small instruction 4 accounts + 1 tree or queue
415            // Fee payer + authority + registered program + account compression program + account compression authority
416            if accounts.len() < 5 {
417                return Err(ParseIndexerEventError::DeserializeSystemInstructionError);
418            }
419            let data: InstructionDataInvokeCpiWithAccountInfo =
420                InstructionDataInvokeCpiWithAccountInfo::deserialize(&mut &instruction[..])?;
421            let system_accounts_len = if data.mode == 0 {
422                11
423            } else {
424                let mut len = 6; // fee_payer + authority + registered_program + account_compression_program + account_compression_authority + system_program
425                if data.compress_or_decompress_lamports > 0 {
426                    len += 1;
427                }
428                if !data.is_compress && data.compress_or_decompress_lamports > 0 {
429                    len += 1;
430                }
431                if data.with_cpi_context {
432                    len += 1;
433                }
434                len
435            };
436            let accounts = accounts.split_at(system_accounts_len).1;
437
438            let instruction = ExecutingSystemInstruction {
439                output_compressed_accounts: data
440                    .account_infos
441                    .iter()
442                    .filter(|x| x.output.is_some())
443                    .map(|x| {
444                        let account = x.output.as_ref().unwrap();
445                        OutputCompressedAccountWithPackedContext {
446                            compressed_account: CompressedAccount {
447                                address: x.address,
448                                owner: data.invoking_program_id,
449                                lamports: account.lamports,
450                                data: Some(CompressedAccountData {
451                                    discriminator: account.discriminator,
452                                    data: account.data.clone(),
453                                    data_hash: account.data_hash,
454                                }),
455                            },
456                            merkle_tree_index: account.output_merkle_tree_index,
457                        }
458                    })
459                    .collect::<Vec<_>>(),
460                input_compressed_accounts: data
461                    .account_infos
462                    .iter()
463                    .filter(|x| x.input.is_some())
464                    .map(|x| {
465                        let account = x.input.as_ref().unwrap();
466                        PackedCompressedAccountWithMerkleContext {
467                            compressed_account: CompressedAccount {
468                                address: x.address,
469                                owner: data.invoking_program_id,
470                                lamports: account.lamports,
471                                data: Some(CompressedAccountData {
472                                    discriminator: account.discriminator,
473                                    data: vec![],
474                                    data_hash: account.data_hash,
475                                }),
476                            },
477                            read_only: false,
478                            root_index: account.root_index,
479                            merkle_context: account.merkle_context,
480                        }
481                    })
482                    .collect::<Vec<_>>(),
483                is_compress: data.is_compress && data.compress_or_decompress_lamports > 0,
484                relay_fee: None,
485                compress_or_decompress_lamports: if data.compress_or_decompress_lamports == 0 {
486                    None
487                } else {
488                    Some(data.compress_or_decompress_lamports)
489                },
490                execute_cpi_context: data.with_cpi_context,
491                accounts,
492            };
493
494            Ok(instruction)
495        }
496        _ => Err(ParseIndexerEventError::DeserializeSystemInstructionError),
497    }
498}
499
500fn create_batched_transaction_event(
501    associated_instructions: &AssociatedInstructions,
502) -> Result<BatchPublicTransactionEvent, ParseIndexerEventError> {
503    let input_sequence_numbers = associated_instructions
504        .insert_into_queues_instruction
505        .input_sequence_numbers
506        .iter()
507        .map(From::from)
508        .filter(|x: &MerkleTreeSequenceNumber| !(*x).is_empty())
509        .collect::<Vec<MerkleTreeSequenceNumber>>();
510    let mut batched_transaction_event = BatchPublicTransactionEvent {
511        event: PublicTransactionEvent {
512            input_compressed_account_hashes: associated_instructions
513                .insert_into_queues_instruction
514                .nullifiers
515                .iter()
516                .map(|x| x.account_hash)
517                .collect(),
518            output_compressed_account_hashes: associated_instructions
519                .insert_into_queues_instruction
520                .leaves
521                .iter()
522                .map(|x| x.leaf)
523                .collect(),
524            output_compressed_accounts: [
525                associated_instructions.cpi_context_outputs.clone(),
526                associated_instructions
527                    .executing_system_instruction
528                    .output_compressed_accounts
529                    .clone(),
530            ]
531            .concat(),
532            output_leaf_indices: associated_instructions
533                .insert_into_queues_instruction
534                .output_leaf_indices
535                .iter()
536                .map(|x| u32::from(*x))
537                .collect(),
538            sequence_numbers: associated_instructions
539                .insert_into_queues_instruction
540                .output_sequence_numbers
541                .iter()
542                .map(From::from)
543                .filter(|x: &MerkleTreeSequenceNumber| !(*x).is_empty())
544                .map(|x| MerkleTreeSequenceNumberV1 {
545                    seq: x.seq,
546                    tree_pubkey: x.tree_pubkey,
547                })
548                .collect(),
549            relay_fee: associated_instructions
550                .executing_system_instruction
551                .relay_fee,
552            is_compress: associated_instructions
553                .executing_system_instruction
554                .is_compress,
555            compress_or_decompress_lamports: associated_instructions
556                .executing_system_instruction
557                .compress_or_decompress_lamports,
558            pubkey_array: associated_instructions
559                .executing_system_instruction
560                .accounts
561                .to_vec(),
562            message: None,
563            ata_owners: Default::default(),
564        },
565        tx_hash: associated_instructions
566            .insert_into_queues_instruction
567            .tx_hash,
568        new_addresses: associated_instructions
569            .insert_into_queues_instruction
570            .addresses
571            .iter()
572            .map(|x| NewAddress {
573                address: x.address,
574                mt_pubkey: associated_instructions.accounts[x.tree_index as usize],
575                queue_index: u64::MAX,
576            })
577            .collect::<Vec<_>>(),
578        address_sequence_numbers: associated_instructions
579            .insert_into_queues_instruction
580            .address_sequence_numbers
581            .iter()
582            .map(From::from)
583            .filter(|x: &MerkleTreeSequenceNumber| !(*x).is_empty())
584            .collect::<Vec<MerkleTreeSequenceNumber>>(),
585        batch_input_accounts: associated_instructions
586            .insert_into_queues_instruction
587            .nullifiers
588            .iter()
589            .filter(|x| {
590                input_sequence_numbers.iter().any(|y| {
591                    y.tree_pubkey == associated_instructions.accounts[x.tree_index as usize]
592                })
593            })
594            .map(|n| {
595                Ok(BatchNullifyContext {
596                    tx_hash: associated_instructions
597                        .insert_into_queues_instruction
598                        .tx_hash,
599                    account_hash: n.account_hash,
600                    nullifier: {
601                        // The nullifier is computed inside the account compression program.
602                        // -> it is not part of the cpi system to account compression program that we index.
603                        // -> we need to compute the nullifier here.
604                        create_nullifier(
605                            &n.account_hash,
606                            n.leaf_index.into(),
607                            &associated_instructions
608                                .insert_into_queues_instruction
609                                .tx_hash,
610                        )?
611                    },
612                    nullifier_queue_index: u64::MAX,
613                })
614            })
615            .collect::<Result<Vec<_>, ParseIndexerEventError>>()?,
616        input_sequence_numbers,
617    };
618
619    let nullifier_queue_indices = create_nullifier_queue_indices(
620        associated_instructions,
621        batched_transaction_event.batch_input_accounts.len(),
622    );
623
624    batched_transaction_event
625        .batch_input_accounts
626        .iter_mut()
627        .zip(nullifier_queue_indices.iter())
628        .for_each(|(context, index)| {
629            context.nullifier_queue_index = *index;
630        });
631
632    let address_queue_indices = create_address_queue_indices(
633        associated_instructions,
634        batched_transaction_event.new_addresses.len(),
635    );
636
637    batched_transaction_event
638        .new_addresses
639        .iter_mut()
640        .zip(address_queue_indices.iter())
641        .for_each(|(context, index)| {
642            context.queue_index = *index;
643        });
644
645    Ok(batched_transaction_event)
646}
647
648fn create_nullifier_queue_indices(
649    associated_instructions: &AssociatedInstructions,
650    len: usize,
651) -> Vec<u64> {
652    let input_merkle_tree_pubkeys = associated_instructions
653        .executing_system_instruction
654        .input_compressed_accounts
655        .iter()
656        .map(|x| {
657            associated_instructions
658                .executing_system_instruction
659                .accounts[x.merkle_context.merkle_tree_pubkey_index as usize]
660        })
661        .collect::<Vec<_>>();
662    let mut nullifier_queue_indices = vec![u64::MAX; len];
663    let mut internal_input_sequence_numbers = associated_instructions
664        .insert_into_queues_instruction
665        .input_sequence_numbers
666        .to_vec();
667    // Walk input_compressed_accounts in order, assigning sequence numbers to batch
668    // accounts using a compact write index. Non-batch (legacy/concurrent) accounts
669    // have no matching sequence number entry and are skipped.
670    let mut batch_idx = 0usize;
671    for merkle_tree_pubkey in input_merkle_tree_pubkeys.iter() {
672        if let Some(seq) = internal_input_sequence_numbers
673            .iter_mut()
674            .find(|s| s.tree_pubkey == *merkle_tree_pubkey)
675        {
676            nullifier_queue_indices[batch_idx] = seq.seq.into();
677            seq.seq += 1;
678            batch_idx += 1;
679        }
680    }
681    nullifier_queue_indices
682}
683
684fn create_address_queue_indices(
685    associated_instructions: &AssociatedInstructions,
686    len: usize,
687) -> Vec<u64> {
688    let address_merkle_tree_pubkeys = associated_instructions
689        .insert_into_queues_instruction
690        .addresses
691        .iter()
692        .map(|x| associated_instructions.accounts[x.tree_index as usize])
693        .collect::<Vec<_>>();
694    let mut address_queue_indices = vec![u64::MAX; len];
695    let mut internal_address_sequence_numbers = associated_instructions
696        .insert_into_queues_instruction
697        .address_sequence_numbers
698        .to_vec();
699    internal_address_sequence_numbers
700        .iter_mut()
701        .for_each(|seq| {
702            for (i, merkle_tree_pubkey) in address_merkle_tree_pubkeys.iter().enumerate() {
703                if *merkle_tree_pubkey == seq.tree_pubkey.into() {
704                    address_queue_indices[i] = seq.seq.into();
705                    seq.seq += 1;
706                }
707            }
708        });
709    address_queue_indices
710}