data_anchor_client/
blober_client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4    sync::Arc,
5    time::Duration,
6};
7
8use anchor_lang::{Discriminator, Space};
9use blober_client_builder::{IsSet, IsUnset, SetHeliusFeeEstimate, SetIndexerClient};
10use bon::Builder;
11use data_anchor_api::{
12    extract_relevant_instructions, get_account_at_index, BlobsByBlober, BlobsByPayer,
13    CompoundProof, IndexerRpcClient, RelevantInstruction, RelevantInstructionWithAccounts,
14};
15use data_anchor_blober::{
16    find_blob_address, find_blober_address,
17    instruction::{Close, DeclareBlob, DiscardBlob, FinalizeBlob, Initialize, InsertChunk},
18    state::blober::Blober,
19    BLOB_ACCOUNT_INSTRUCTION_IDX, BLOB_BLOBER_INSTRUCTION_IDX, CHUNK_SIZE,
20    COMPOUND_DECLARE_TX_SIZE, COMPOUND_TX_SIZE,
21};
22use futures::StreamExt;
23use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
24use solana_cli_config::Config;
25use solana_client::rpc_config::RpcTransactionConfig;
26use solana_rpc_client::nonblocking::rpc_client::RpcClient;
27use solana_rpc_client_api::config::RpcBlockConfig;
28use solana_sdk::{
29    commitment_config::CommitmentConfig,
30    message::VersionedMessage,
31    pubkey::Pubkey,
32    signature::{Keypair, Signature},
33    signer::Signer,
34};
35use solana_transaction_status::{EncodedConfirmedBlock, UiTransactionEncoding};
36use tracing::{info_span, Instrument, Span};
37
38use crate::{
39    batch_client::{BatchClient, SuccessfulTransaction},
40    constants::{DEFAULT_CONCURRENCY, DEFAULT_LOOKBACK_SLOTS},
41    fees::{Fee, FeeStrategy, Lamports, Priority},
42    helpers::{
43        check_outcomes, filter_relevant_instructions, get_blob_data_from_instructions,
44        get_unique_timestamp,
45    },
46    tx::{Compound, CompoundDeclare, CompoundFinalize, MessageArguments, MessageBuilder},
47    types::{IndexerError, TransactionType, UploadBlobError},
48    BloberClientError, BloberClientResult, LedgerDataBlobError,
49};
50
51#[derive(Builder, Clone)]
52pub struct BloberClient {
53    #[builder(getter(name = get_payer, vis = ""))]
54    pub(crate) payer: Arc<Keypair>,
55    pub(crate) program_id: Pubkey,
56    pub(crate) rpc_client: Arc<RpcClient>,
57    pub(crate) batch_client: BatchClient,
58    // Optional for the sake of testing, because in some tests indexer client is not used
59    pub(crate) indexer_client: Option<Arc<WsClient>>,
60    #[builder(default = false)]
61    pub(crate) helius_fee_estimate: bool,
62}
63
64impl<State: blober_client_builder::State> BloberClientBuilder<State> {
65    /// Adds an indexer client to the builder based on the given indexer URL.
66    ///
67    /// # Example
68    ///
69    /// ```rust
70    /// use blober_client::BloberClient;
71    ///
72    /// let builder_with_indexer = BloberClient::builder()
73    ///     .indexer_from_url("ws://localhost:8080")
74    ///     .await?;
75    /// ```
76    pub async fn indexer_from_url(
77        self,
78        indexer_url: &str,
79    ) -> BloberClientResult<BloberClientBuilder<SetIndexerClient<State>>>
80    where
81        State::IndexerClient: IsUnset,
82    {
83        let indexer_client = WsClientBuilder::new().build(indexer_url).await?;
84        Ok(self.indexer_client(Arc::new(indexer_client)))
85    }
86
87    /// Builds a new `BloberClient` with an RPC client and a batch client built from the given
88    /// Solana cli [`Config`].
89    ///
90    /// # Example
91    ///
92    /// ```rust
93    /// use std::sync::Arc;
94    ///
95    /// use blober_client::{BloberClient};
96    /// use solana_cli_config::Config;
97    /// use solana_sdk::{pubkey::Pubkey, signature::Keypair};
98    ///
99    /// let payer = Arc::new(Keypair::new());
100    /// let program_id = Pubkey::new_unique();
101    /// let solana_config = Config::default();
102    /// let client = BloberClient::builder()
103    ///     .payer(payer)
104    ///     .program_id(program_id)
105    ///     .build_with_config(solana_config)
106    ///     .await?;
107    /// ```
108    pub async fn build_with_config(self, solana_config: Config) -> BloberClientResult<BloberClient>
109    where
110        State::Payer: IsSet,
111        State::ProgramId: IsSet,
112        State::RpcClient: IsUnset,
113        State::BatchClient: IsUnset,
114    {
115        let rpc_client = Arc::new(RpcClient::new_with_commitment(
116            solana_config.json_rpc_url.clone(),
117            CommitmentConfig::from_str(&solana_config.commitment)?,
118        ));
119        let payer = self.get_payer().clone();
120        Ok(self
121            .rpc_client(rpc_client.clone())
122            .batch_client(BatchClient::new(rpc_client.clone(), vec![payer.clone()]).await?)
123            .build())
124    }
125
126    pub fn with_helius_fee_estimate(self) -> BloberClientBuilder<SetHeliusFeeEstimate<State>>
127    where
128        State::HeliusFeeEstimate: IsUnset,
129    {
130        self.helius_fee_estimate(true)
131    }
132}
133
134impl BloberClient {
135    /// Returns the underlaying [`RpcClient`].
136    pub fn rpc_client(&self) -> Arc<RpcClient> {
137        self.rpc_client.clone()
138    }
139
140    /// Returns the transaction payer [`Keypair`].
141    pub fn payer(&self) -> Arc<Keypair> {
142        self.payer.clone()
143    }
144
145    /// Initializes a new [`Blober`] PDA account.
146    pub async fn initialize_blober(
147        &self,
148        fee_strategy: FeeStrategy,
149        namespace: &str,
150        timeout: Option<Duration>,
151    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
152        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
153
154        let fee_strategy = self
155            .convert_fee_strategy_to_fixed(
156                fee_strategy,
157                &[blober],
158                TransactionType::InitializeBlober,
159            )
160            .in_current_span()
161            .await?;
162
163        let msg = Initialize::build_message(MessageArguments::new(
164            self.program_id,
165            blober,
166            &self.payer,
167            self.rpc_client.clone(),
168            fee_strategy,
169            self.helius_fee_estimate,
170            (namespace.to_owned(), blober),
171        ))
172        .await
173        .expect("infallible with a fixed fee strategy");
174
175        let span = info_span!(parent: Span::current(), "initialize_blober");
176        Ok(check_outcomes(
177            self.batch_client
178                .send(vec![(TransactionType::InitializeBlober, msg)], timeout)
179                .instrument(span)
180                .await,
181        )
182        .map_err(UploadBlobError::InitializeBlober)?)
183    }
184
185    /// Closes a [`Blober`] PDA account.
186    pub async fn close_blober(
187        &self,
188        fee_strategy: FeeStrategy,
189        namespace: &str,
190        timeout: Option<Duration>,
191    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
192        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
193
194        let fee_strategy = self
195            .convert_fee_strategy_to_fixed(fee_strategy, &[blober], TransactionType::CloseBlober)
196            .in_current_span()
197            .await?;
198
199        let msg = Close::build_message(MessageArguments::new(
200            self.program_id,
201            blober,
202            &self.payer,
203            self.rpc_client.clone(),
204            fee_strategy,
205            self.helius_fee_estimate,
206            (),
207        ))
208        .await
209        .expect("infallible with a fixed fee strategy");
210
211        let span = info_span!(parent: Span::current(), "close_blober");
212        Ok(check_outcomes(
213            self.batch_client
214                .send(vec![(TransactionType::CloseBlober, msg)], timeout)
215                .instrument(span)
216                .await,
217        )
218        .map_err(UploadBlobError::CloseBlober)?)
219    }
220
221    /// Uploads a blob of data with the given [`Blober`] PDA account.
222    /// Under the hood it creates a new [`data_anchor_blober::state::blob::Blob`] PDA which stores a
223    /// incremental hash of the chunks from the blob data. On completion of the blob upload, the
224    /// blob PDA gets closed sending it's funds back to the [`BloberClient::payer`].
225    /// If the blob upload fails, the blob PDA gets discarded and the funds also get sent to the
226    /// [`BloberClient::payer`].
227    pub async fn upload_blob(
228        &self,
229        blob_data: &[u8],
230        fee_strategy: FeeStrategy,
231        namespace: &str,
232        timeout: Option<Duration>,
233    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
234        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
235        let timestamp = get_unique_timestamp();
236
237        let blob = find_blob_address(
238            self.program_id,
239            self.payer.pubkey(),
240            blober,
241            timestamp,
242            blob_data.len(),
243        );
244
245        let upload_messages = self
246            .generate_messages(blob, timestamp, blob_data, fee_strategy, blober)
247            .await?;
248
249        let res = self
250            .do_upload(upload_messages, timeout)
251            .in_current_span()
252            .await;
253
254        if let Err(BloberClientError::UploadBlob(UploadBlobError::DeclareBlob(_))) = res {
255            self.discard_blob(fee_strategy, blob, namespace, timeout)
256                .await
257        } else {
258            res
259        }
260    }
261
262    /// Discards a [`data_anchor_blober::state::blob::Blob`] PDA account registered with the provided
263    /// [`Blober`] PDA account.
264    pub async fn discard_blob(
265        &self,
266        fee_strategy: FeeStrategy,
267        blob: Pubkey,
268        namespace: &str,
269        timeout: Option<Duration>,
270    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
271        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
272
273        let fee_strategy = self
274            .convert_fee_strategy_to_fixed(fee_strategy, &[blob], TransactionType::DiscardBlob)
275            .in_current_span()
276            .await?;
277
278        let msg = DiscardBlob::build_message(MessageArguments::new(
279            self.program_id,
280            blober,
281            &self.payer,
282            self.rpc_client.clone(),
283            fee_strategy,
284            self.helius_fee_estimate,
285            blob,
286        ))
287        .in_current_span()
288        .await
289        .expect("infallible with a fixed fee strategy");
290
291        let span = info_span!(parent: Span::current(), "discard_blob");
292
293        Ok(check_outcomes(
294            self.batch_client
295                .send(vec![(TransactionType::DiscardBlob, msg)], timeout)
296                .instrument(span)
297                .await,
298        )
299        .map_err(UploadBlobError::DiscardBlob)?)
300    }
301
302    /// Estimates fees for uploading a blob of the size `blob_size` with the given `priority`.
303    /// This whole functions is basically a simulation that doesn't run anything. Instead of executing transactions,
304    /// it just sums the expected fees and number of signatures.
305    ///
306    /// The [`data_anchor_blober::state::blob::Blob`] PDA account is always newly created, so for estimating compute fees
307    /// we don't even need the real keypair, any unused pubkey will do.
308    pub async fn estimate_fees(
309        &self,
310        blob_size: usize,
311        blober: Pubkey,
312        priority: Priority,
313    ) -> BloberClientResult<Fee> {
314        let prioritization_fee_rate = priority
315            .get_priority_fee_estimate(
316                &self.rpc_client,
317                &[Pubkey::new_unique(), blober, self.payer.pubkey()],
318                self.helius_fee_estimate,
319            )
320            .await?;
321
322        let num_chunks = blob_size.div_ceil(CHUNK_SIZE as usize) as u16;
323
324        let (compute_unit_limit, num_signatures) = if blob_size < COMPOUND_TX_SIZE as usize {
325            (Compound::COMPUTE_UNIT_LIMIT, Compound::NUM_SIGNATURES)
326        } else if blob_size < COMPOUND_DECLARE_TX_SIZE as usize {
327            (
328                CompoundDeclare::COMPUTE_UNIT_LIMIT + FinalizeBlob::COMPUTE_UNIT_LIMIT,
329                CompoundDeclare::NUM_SIGNATURES + FinalizeBlob::NUM_SIGNATURES,
330            )
331        } else {
332            (
333                DeclareBlob::COMPUTE_UNIT_LIMIT
334                    + (num_chunks - 1) as u32 * InsertChunk::COMPUTE_UNIT_LIMIT
335                    + CompoundFinalize::COMPUTE_UNIT_LIMIT,
336                DeclareBlob::NUM_SIGNATURES
337                    + (num_chunks - 1) * InsertChunk::NUM_SIGNATURES
338                    + CompoundFinalize::NUM_SIGNATURES,
339            )
340        };
341
342        // The base Solana transaction fee = 5000.
343        // Reference link: https://solana.com/docs/core/fees#:~:text=While%20transaction%20fees%20are%20paid,of%205k%20lamports%20per%20signature.
344        let price_per_signature = Lamports::new(5000);
345
346        let blob_account_size = Blober::DISCRIMINATOR.len() + Blober::INIT_SPACE;
347
348        Ok(Fee {
349            num_signatures,
350            price_per_signature,
351            compute_unit_limit,
352            prioritization_fee_rate,
353            blob_account_size,
354        })
355    }
356
357    /// Returns the raw blob data from the ledger for the given signatures.
358    pub async fn get_ledger_blobs_from_signatures(
359        &self,
360        namespace: &str,
361        payer_pubkey: Option<Pubkey>,
362        signatures: Vec<Signature>,
363    ) -> BloberClientResult<Vec<u8>> {
364        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
365        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
366
367        let relevant_transactions = futures::stream::iter(signatures)
368            .map(|signature| async move {
369                self.rpc_client
370                    .get_transaction_with_config(
371                        &signature,
372                        RpcTransactionConfig {
373                            commitment: Some(self.rpc_client.commitment()),
374                            encoding: Some(UiTransactionEncoding::Base58),
375                            ..Default::default()
376                        },
377                    )
378                    .await
379            })
380            .buffer_unordered(DEFAULT_CONCURRENCY)
381            .collect::<Vec<_>>()
382            .await
383            .into_iter()
384            .collect::<Result<Vec<_>, _>>()?;
385
386        let relevant_instructions = extract_relevant_instructions(
387            &self.program_id,
388            &relevant_transactions
389                .iter()
390                .filter_map(|encoded| match &encoded.transaction.meta {
391                    Some(meta) if meta.status.is_err() => None,
392                    _ => encoded.transaction.transaction.decode(),
393                })
394                .collect::<Vec<_>>(),
395        );
396
397        let declares = relevant_instructions
398            .iter()
399            .filter_map(|instruction| {
400                (instruction.blober == blober
401                    && matches!(instruction.instruction, RelevantInstruction::DeclareBlob(_)))
402                .then_some(instruction.blob)
403            })
404            .collect::<Vec<Pubkey>>();
405
406        let Some(blob) = declares.first() else {
407            return Err(LedgerDataBlobError::DeclareNotFound.into());
408        };
409
410        if declares.len() > 1 {
411            return Err(LedgerDataBlobError::MultipleDeclares.into());
412        }
413
414        if relevant_instructions
415            .iter()
416            .filter(|instruction| {
417                matches!(
418                    instruction.instruction,
419                    RelevantInstruction::FinalizeBlob(_)
420                )
421            })
422            .count()
423            > 1
424        {
425            return Err(LedgerDataBlobError::MultipleFinalizes.into());
426        }
427
428        Ok(get_blob_data_from_instructions(
429            &relevant_instructions,
430            blober,
431            *blob,
432        )?)
433    }
434
435    /// Fetches all blobs finalized in a given slot from the ledger.
436    pub async fn get_ledger_blobs(
437        &self,
438        slot: u64,
439        namespace: &str,
440        payer_pubkey: Option<Pubkey>,
441        lookback_slots: Option<u64>,
442    ) -> BloberClientResult<Vec<Vec<u8>>> {
443        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
444        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
445
446        let block_config = RpcBlockConfig {
447            commitment: Some(self.rpc_client.commitment()),
448            encoding: Some(UiTransactionEncoding::Base58),
449            ..Default::default()
450        };
451        let block = self
452            .rpc_client
453            .get_block_with_config(slot, block_config)
454            .await?;
455
456        let Some(transactions) = block.transactions else {
457            // If there are no transactions in the block, that means there are no blobs to fetch.
458            return Ok(Vec::new());
459        };
460
461        let relevant_instructions = extract_relevant_instructions(
462            &self.program_id,
463            &transactions
464                .iter()
465                .filter_map(|tx| match &tx.meta {
466                    Some(meta) if meta.status.is_err() => None,
467                    _ => tx.transaction.decode(),
468                })
469                .collect::<Vec<_>>(),
470        );
471        let finalized_blobs = relevant_instructions
472            .iter()
473            .filter_map(|instruction| {
474                (instruction.blober == blober
475                    && matches!(
476                        instruction.instruction,
477                        RelevantInstruction::FinalizeBlob(_)
478                    ))
479                .then_some(instruction.blob)
480            })
481            .collect::<HashSet<Pubkey>>();
482
483        let mut relevant_instructions_map = HashMap::new();
484        filter_relevant_instructions(
485            relevant_instructions,
486            &finalized_blobs,
487            &mut relevant_instructions_map,
488        );
489
490        let mut blobs = HashMap::with_capacity(finalized_blobs.len());
491        for blob in &finalized_blobs {
492            let instructions = relevant_instructions_map
493                .get(blob)
494                .expect("This should never happen since we at least have the finalize instruction");
495
496            if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob) {
497                blobs.insert(blob, blob_data);
498            }
499        }
500
501        // If all blobs are found, return them.
502        if blobs.len() == finalized_blobs.len() {
503            return Ok(blobs.values().cloned().collect());
504        }
505
506        let lookback_slots = lookback_slots.unwrap_or(DEFAULT_LOOKBACK_SLOTS);
507
508        let block_slots = self
509            .rpc_client
510            .get_blocks_with_commitment(
511                slot - lookback_slots,
512                Some(slot - 1),
513                self.rpc_client.commitment(),
514            )
515            .await?;
516
517        for slot in block_slots.into_iter().rev() {
518            let block = self
519                .rpc_client
520                .get_block_with_config(slot, block_config)
521                .await?;
522            let Some(transactions) = block.transactions else {
523                // If there are no transactions in the block, go to the next block.
524                continue;
525            };
526            let new_relevant_instructions = extract_relevant_instructions(
527                &self.program_id,
528                &transactions
529                    .iter()
530                    .filter_map(|tx| match &tx.meta {
531                        Some(meta) if meta.status.is_err() => None,
532                        _ => tx.transaction.decode(),
533                    })
534                    .collect::<Vec<_>>(),
535            );
536            filter_relevant_instructions(
537                new_relevant_instructions,
538                &finalized_blobs,
539                &mut relevant_instructions_map,
540            );
541            for blob in &finalized_blobs {
542                if blobs.contains_key(blob) {
543                    continue;
544                }
545                let instructions = relevant_instructions_map.get(blob).expect(
546                    "This should never happen since we at least have the finalize instruction",
547                );
548                println!("total {}", instructions.len());
549
550                if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob)
551                {
552                    blobs.insert(blob, blob_data);
553                }
554            }
555            if blobs.len() == finalized_blobs.len() {
556                break;
557            }
558        }
559
560        Ok(blobs.values().cloned().collect())
561    }
562
563    /// Fetches all blobs for a given slot from the [`IndexerRpcClient`].
564    pub async fn get_blobs(
565        &self,
566        slot: u64,
567        namespace: &str,
568        payer_pubkey: Option<Pubkey>,
569    ) -> BloberClientResult<Vec<Vec<u8>>> {
570        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
571        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
572
573        loop {
574            let blobs = self
575                .indexer()
576                .get_blobs(blober, slot)
577                .await
578                .map_err(|e| IndexerError::Blobs(slot, e.to_string()))?;
579            if let Some(blobs) = blobs {
580                return Ok(blobs);
581            }
582            tokio::time::sleep(Duration::from_millis(100)).await;
583        }
584    }
585
586    /// Fetches blobs for a given [`BlobsByBlober`] from the [`IndexerRpcClient`].
587    pub async fn get_blobs_by_blober(
588        &self,
589        blober_blobs: BlobsByBlober,
590    ) -> BloberClientResult<Vec<Vec<u8>>> {
591        let blober = blober_blobs.blober;
592
593        self.indexer()
594            .get_blobs_by_blober(blober_blobs)
595            .await
596            .map_err(|e| IndexerError::BlobsForBlober(blober.to_string(), e.to_string()).into())
597    }
598
599    /// Fetches blobs for a given [`BlobsByPayer`] from the [`IndexerRpcClient`].
600    pub async fn get_blobs_by_payer(
601        &self,
602        payer_blobs: BlobsByPayer,
603    ) -> BloberClientResult<Vec<Vec<u8>>> {
604        let payer = payer_blobs.payer;
605
606        self.indexer()
607            .get_blobs_by_payer(payer_blobs)
608            .await
609            .map_err(|e| IndexerError::BlobsForPayer(payer.to_string(), e.to_string()).into())
610    }
611
612    /// Fetches compound proof for a given slot from the [`IndexerRpcClient`].
613    pub async fn get_slot_proof(
614        &self,
615        slot: u64,
616        namespace: &str,
617        payer_pubkey: Option<Pubkey>,
618    ) -> BloberClientResult<CompoundProof> {
619        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
620        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
621
622        loop {
623            let proof = self
624                .indexer()
625                .get_proof(blober, slot)
626                .await
627                .map_err(|e| IndexerError::Proof(slot, e.to_string()))?;
628            if let Some(proofs) = proof {
629                return Ok(proofs);
630            }
631            tokio::time::sleep(Duration::from_millis(100)).await;
632        }
633    }
634
635    /// Fetches compound proof for a given blob PDA [`Pubkey`] from the [`IndexerRpcClient`].
636    pub async fn get_blob_proof(&self, blob: Pubkey) -> BloberClientResult<Option<CompoundProof>> {
637        self.indexer()
638            .get_proof_for_blob(blob)
639            .await
640            .map_err(|e| IndexerError::ProofForBlob(blob.to_string(), e.to_string()).into())
641    }
642
643    /// Fetches blob messages for a given slot
644    /// Returns a tuple of ([`Pubkey`], [`VersionedMessage`]) where the Pubkey is the address of
645    /// the [`data_anchor_blober::state::blob::Blob`] account and the VersionedMessage is the message
646    /// that included the [`data_anchor_blober::instruction::FinalizeBlob`] instruction.
647    pub async fn get_blob_messages(
648        &self,
649        slot: u64,
650        namespace: &str,
651        payer_pubkey: Option<Pubkey>,
652    ) -> BloberClientResult<Vec<(Pubkey, VersionedMessage)>> {
653        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
654        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
655
656        let block: EncodedConfirmedBlock = self
657            .rpc_client
658            .get_block_with_config(
659                slot,
660                RpcBlockConfig {
661                    commitment: Some(self.rpc_client.commitment()),
662                    encoding: Some(UiTransactionEncoding::Base58),
663                    ..Default::default()
664                },
665            )
666            .await?
667            .into();
668
669        let finalized = block
670            .transactions
671            .iter()
672            .filter_map(|tx| match &tx.meta {
673                Some(meta) if meta.status.is_err() => None,
674                _ => tx.transaction.decode(),
675            })
676            .filter_map(|tx| {
677                let instructions = tx
678                    .message
679                    .instructions()
680                    .iter()
681                    .filter_map(|compiled_instruction| {
682                        Some(RelevantInstructionWithAccounts {
683                            blob: get_account_at_index(
684                                &tx,
685                                compiled_instruction,
686                                BLOB_ACCOUNT_INSTRUCTION_IDX,
687                            )?,
688                            blober: get_account_at_index(
689                                &tx,
690                                compiled_instruction,
691                                BLOB_BLOBER_INSTRUCTION_IDX,
692                            )?,
693                            instruction: RelevantInstruction::try_from_slice(compiled_instruction)?,
694                        })
695                    })
696                    .filter(|instruction| {
697                        instruction.blober == blober
698                            && matches!(
699                                instruction.instruction,
700                                RelevantInstruction::FinalizeBlob(_)
701                            )
702                    })
703                    .collect::<Vec<_>>();
704
705                instructions.is_empty().then_some(
706                    instructions
707                        .iter()
708                        .map(|instruction| (instruction.blob, tx.message.clone()))
709                        .collect::<Vec<_>>(),
710                )
711            })
712            .flatten()
713            .collect::<Vec<_>>();
714
715        Ok(finalized)
716    }
717}