nitro_da_client/
blober_client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4    sync::Arc,
5    time::Duration,
6};
7
8use anchor_lang::{Discriminator, Space};
9use blober_client_builder::{IsSet, IsUnset, SetHeliusFeeEstimate, SetIndexerClient};
10use bon::Builder;
11use futures::StreamExt;
12use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
13use nitro_da_blober::{
14    find_blob_address, find_blober_address,
15    instruction::{Close, DeclareBlob, DiscardBlob, FinalizeBlob, Initialize, InsertChunk},
16    state::blober::Blober,
17    CHUNK_SIZE, COMPOUND_DECLARE_TX_SIZE, COMPOUND_TX_SIZE,
18};
19use nitro_da_indexer_api::{
20    extract_relevant_instructions, get_account_at_index, BlobsByBlober, BlobsByPayer,
21    CompoundProof, IndexerRpcClient, RelevantInstruction, RelevantInstructionWithAccounts,
22};
23use solana_cli_config::Config;
24use solana_client::rpc_config::RpcTransactionConfig;
25use solana_rpc_client::nonblocking::rpc_client::RpcClient;
26use solana_rpc_client_api::config::RpcBlockConfig;
27use solana_sdk::{
28    commitment_config::CommitmentConfig,
29    message::VersionedMessage,
30    pubkey::Pubkey,
31    signature::{Keypair, Signature},
32    signer::Signer,
33};
34use solana_transaction_status::{EncodedConfirmedBlock, UiTransactionEncoding};
35use tracing::{info_span, Instrument, Span};
36
37use crate::{
38    batch_client::{BatchClient, SuccessfulTransaction},
39    constants::{DEFAULT_CONCURRENCY, DEFAULT_LOOKBACK_SLOTS},
40    fees::{Fee, FeeStrategy, Lamports, Priority},
41    helpers::{
42        check_outcomes, filter_relevant_instructions, get_blob_data_from_instructions,
43        get_unique_timestamp,
44    },
45    tx::{Compound, CompoundDeclare, CompoundFinalize, MessageArguments, MessageBuilder},
46    types::{IndexerError, TransactionType, UploadBlobError},
47    BloberClientError, BloberClientResult, LedgerDataBlobError,
48};
49
50#[derive(Builder, Clone)]
51pub struct BloberClient {
52    #[builder(getter(name = get_payer, vis = ""))]
53    pub(crate) payer: Arc<Keypair>,
54    pub(crate) program_id: Pubkey,
55    pub(crate) rpc_client: Arc<RpcClient>,
56    pub(crate) batch_client: BatchClient,
57    // Optional for the sake of testing, because in some tests indexer client is not used
58    pub(crate) indexer_client: Option<Arc<WsClient>>,
59    #[builder(default = false)]
60    pub(crate) helius_fee_estimate: bool,
61}
62
63impl<State: blober_client_builder::State> BloberClientBuilder<State> {
64    /// Adds an indexer client to the builder based on the given indexer URL.
65    ///
66    /// # Example
67    ///
68    /// ```rust
69    /// use blober_client::BloberClient;
70    ///
71    /// let builder_with_indexer = BloberClient::builder()
72    ///     .indexer_from_url("ws://localhost:8080")
73    ///     .await?;
74    /// ```
75    pub async fn indexer_from_url(
76        self,
77        indexer_url: &str,
78    ) -> BloberClientResult<BloberClientBuilder<SetIndexerClient<State>>>
79    where
80        State::IndexerClient: IsUnset,
81    {
82        let indexer_client = WsClientBuilder::new().build(indexer_url).await?;
83        Ok(self.indexer_client(Arc::new(indexer_client)))
84    }
85
86    /// Builds a new `BloberClient` with an RPC client and a batch client built from the given
87    /// Solana cli [`Config`].
88    ///
89    /// # Example
90    ///
91    /// ```rust
92    /// use std::sync::Arc;
93    ///
94    /// use blober_client::{BloberClient};
95    /// use solana_cli_config::Config;
96    /// use solana_sdk::{pubkey::Pubkey, signature::Keypair};
97    ///
98    /// let payer = Arc::new(Keypair::new());
99    /// let program_id = Pubkey::new_unique();
100    /// let solana_config = Config::default();
101    /// let client = BloberClient::builder()
102    ///     .payer(payer)
103    ///     .program_id(program_id)
104    ///     .build_with_config(solana_config)
105    ///     .await?;
106    /// ```
107    pub async fn build_with_config(self, solana_config: Config) -> BloberClientResult<BloberClient>
108    where
109        State::Payer: IsSet,
110        State::ProgramId: IsSet,
111        State::RpcClient: IsUnset,
112        State::BatchClient: IsUnset,
113    {
114        let rpc_client = Arc::new(RpcClient::new_with_commitment(
115            solana_config.json_rpc_url.clone(),
116            CommitmentConfig::from_str(&solana_config.commitment)?,
117        ));
118        let payer = self.get_payer().clone();
119        Ok(self
120            .rpc_client(rpc_client.clone())
121            .batch_client(BatchClient::new(rpc_client.clone(), vec![payer.clone()]).await?)
122            .build())
123    }
124
125    pub fn with_helius_fee_estimate(self) -> BloberClientBuilder<SetHeliusFeeEstimate<State>>
126    where
127        State::HeliusFeeEstimate: IsUnset,
128    {
129        self.helius_fee_estimate(true)
130    }
131}
132
133impl BloberClient {
134    /// Returns the underlaying [`RpcClient`].
135    pub fn rpc_client(&self) -> Arc<RpcClient> {
136        self.rpc_client.clone()
137    }
138
139    /// Returns the transaction payer [`Keypair`].
140    pub fn payer(&self) -> Arc<Keypair> {
141        self.payer.clone()
142    }
143
144    /// Initializes a new [`Blober`] PDA account.
145    pub async fn initialize_blober(
146        &self,
147        fee_strategy: FeeStrategy,
148        namespace: &str,
149        timeout: Option<Duration>,
150    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
151        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
152
153        let fee_strategy = self
154            .convert_fee_strategy_to_fixed(
155                fee_strategy,
156                &[blober],
157                TransactionType::InitializeBlober,
158            )
159            .in_current_span()
160            .await?;
161
162        let msg = Initialize::build_message(MessageArguments::new(
163            self.program_id,
164            blober,
165            &self.payer,
166            self.rpc_client.clone(),
167            fee_strategy,
168            self.helius_fee_estimate,
169            (namespace.to_owned(), blober),
170        ))
171        .await
172        .expect("infallible with a fixed fee strategy");
173
174        let span = info_span!(parent: Span::current(), "initialize_blober");
175        Ok(check_outcomes(
176            self.batch_client
177                .send(vec![(TransactionType::InitializeBlober, msg)], timeout)
178                .instrument(span)
179                .await,
180        )
181        .map_err(UploadBlobError::InitializeBlober)?)
182    }
183
184    /// Closes a [`Blober`] PDA account.
185    pub async fn close_blober(
186        &self,
187        fee_strategy: FeeStrategy,
188        namespace: &str,
189        timeout: Option<Duration>,
190    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
191        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
192
193        let fee_strategy = self
194            .convert_fee_strategy_to_fixed(fee_strategy, &[blober], TransactionType::CloseBlober)
195            .in_current_span()
196            .await?;
197
198        let msg = Close::build_message(MessageArguments::new(
199            self.program_id,
200            blober,
201            &self.payer,
202            self.rpc_client.clone(),
203            fee_strategy,
204            self.helius_fee_estimate,
205            (),
206        ))
207        .await
208        .expect("infallible with a fixed fee strategy");
209
210        let span = info_span!(parent: Span::current(), "close_blober");
211        Ok(check_outcomes(
212            self.batch_client
213                .send(vec![(TransactionType::CloseBlober, msg)], timeout)
214                .instrument(span)
215                .await,
216        )
217        .map_err(UploadBlobError::CloseBlober)?)
218    }
219
220    /// Uploads a blob of data with the given [`Blober`] PDA account.
221    /// Under the hood it creates a new [`blober::state::blob::Blob`] PDA which stores a incremental hash of the chunks
222    /// from the blob data. On completion of the blob upload, the blob PDA gets closed sending it's
223    /// funds back to the [`BloberClient::payer`].
224    /// If the blob upload fails, the blob PDA gets discarded and the funds also get sent to the
225    /// [`BloberClient::payer`].
226    pub async fn upload_blob(
227        &self,
228        blob_data: &[u8],
229        fee_strategy: FeeStrategy,
230        namespace: &str,
231        timeout: Option<Duration>,
232    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
233        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
234        let timestamp = get_unique_timestamp();
235
236        let blob = find_blob_address(
237            self.program_id,
238            self.payer.pubkey(),
239            blober,
240            timestamp,
241            blob_data.len(),
242        );
243
244        let upload_messages = self
245            .generate_messages(blob, timestamp, blob_data, fee_strategy, blober)
246            .await?;
247
248        let res = self
249            .do_upload(upload_messages, timeout)
250            .in_current_span()
251            .await;
252
253        if let Err(BloberClientError::UploadBlob(UploadBlobError::DeclareBlob(_))) = res {
254            self.discard_blob(fee_strategy, blob, namespace, timeout)
255                .await
256        } else {
257            res
258        }
259    }
260
261    /// Discards a [`blober::state::blob::Blob`] PDA account registered with the provided
262    /// [`Blober`] PDA account.
263    pub async fn discard_blob(
264        &self,
265        fee_strategy: FeeStrategy,
266        blob: Pubkey,
267        namespace: &str,
268        timeout: Option<Duration>,
269    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
270        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
271
272        let fee_strategy = self
273            .convert_fee_strategy_to_fixed(fee_strategy, &[blob], TransactionType::DiscardBlob)
274            .in_current_span()
275            .await?;
276
277        let msg = DiscardBlob::build_message(MessageArguments::new(
278            self.program_id,
279            blober,
280            &self.payer,
281            self.rpc_client.clone(),
282            fee_strategy,
283            self.helius_fee_estimate,
284            blob,
285        ))
286        .in_current_span()
287        .await
288        .expect("infallible with a fixed fee strategy");
289
290        let span = info_span!(parent: Span::current(), "discard_blob");
291
292        Ok(check_outcomes(
293            self.batch_client
294                .send(vec![(TransactionType::DiscardBlob, msg)], timeout)
295                .instrument(span)
296                .await,
297        )
298        .map_err(UploadBlobError::DiscardBlob)?)
299    }
300
301    /// Estimates fees for uploading a blob of the size `blob_size` with the given `priority`.
302    /// This whole functions is basically a simulation that doesn't run anything. Instead of executing transactions,
303    /// it just sums the expected fees and number of signatures.
304    ///
305    /// The [`blober::state::blob::Blob`] PDA account is always newly created, so for estimating compute fees
306    /// we don't even need the real keypair, any unused pubkey will do.
307    pub async fn estimate_fees(
308        &self,
309        blob_size: usize,
310        blober: Pubkey,
311        priority: Priority,
312    ) -> BloberClientResult<Fee> {
313        let prioritization_fee_rate = priority
314            .get_priority_fee_estimate(
315                &self.rpc_client,
316                &[Pubkey::new_unique(), blober, self.payer.pubkey()],
317                self.helius_fee_estimate,
318            )
319            .await?;
320
321        let num_chunks = blob_size.div_ceil(CHUNK_SIZE as usize) as u16;
322
323        let (compute_unit_limit, num_signatures) = if blob_size < COMPOUND_TX_SIZE as usize {
324            (Compound::COMPUTE_UNIT_LIMIT, Compound::NUM_SIGNATURES)
325        } else if blob_size < COMPOUND_DECLARE_TX_SIZE as usize {
326            (
327                CompoundDeclare::COMPUTE_UNIT_LIMIT + FinalizeBlob::COMPUTE_UNIT_LIMIT,
328                CompoundDeclare::NUM_SIGNATURES + FinalizeBlob::NUM_SIGNATURES,
329            )
330        } else {
331            (
332                DeclareBlob::COMPUTE_UNIT_LIMIT
333                    + (num_chunks - 1) as u32 * InsertChunk::COMPUTE_UNIT_LIMIT
334                    + CompoundFinalize::COMPUTE_UNIT_LIMIT,
335                DeclareBlob::NUM_SIGNATURES
336                    + (num_chunks - 1) * InsertChunk::NUM_SIGNATURES
337                    + CompoundFinalize::NUM_SIGNATURES,
338            )
339        };
340
341        // The base Solana transaction fee = 5000.
342        // Reference link: https://solana.com/docs/core/fees#:~:text=While%20transaction%20fees%20are%20paid,of%205k%20lamports%20per%20signature.
343        let price_per_signature = Lamports::new(5000);
344
345        let blob_account_size = Blober::DISCRIMINATOR.len() + Blober::INIT_SPACE;
346
347        Ok(Fee {
348            num_signatures,
349            price_per_signature,
350            compute_unit_limit,
351            prioritization_fee_rate,
352            blob_account_size,
353        })
354    }
355
356    /// Returns the raw blob data from the ledger for the given signatures.
357    pub async fn get_ledger_blobs_from_signatures(
358        &self,
359        namespace: &str,
360        payer_pubkey: Option<Pubkey>,
361        signatures: Vec<Signature>,
362    ) -> BloberClientResult<Vec<u8>> {
363        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
364        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
365
366        let relevant_transactions = futures::stream::iter(signatures)
367            .map(|signature| async move {
368                self.rpc_client
369                    .get_transaction_with_config(
370                        &signature,
371                        RpcTransactionConfig {
372                            commitment: Some(self.rpc_client.commitment()),
373                            encoding: Some(UiTransactionEncoding::Base58),
374                            ..Default::default()
375                        },
376                    )
377                    .await
378            })
379            .buffer_unordered(DEFAULT_CONCURRENCY)
380            .collect::<Vec<_>>()
381            .await
382            .into_iter()
383            .collect::<Result<Vec<_>, _>>()?;
384
385        let relevant_instructions = extract_relevant_instructions(
386            &relevant_transactions
387                .iter()
388                .filter_map(|encoded| match &encoded.transaction.meta {
389                    Some(meta) if meta.status.is_err() => None,
390                    _ => encoded.transaction.transaction.decode(),
391                })
392                .collect::<Vec<_>>(),
393        );
394
395        let declares = relevant_instructions
396            .iter()
397            .filter_map(|instruction| {
398                (instruction.blober == blober
399                    && matches!(instruction.instruction, RelevantInstruction::DeclareBlob(_)))
400                .then_some(instruction.blob)
401            })
402            .collect::<Vec<Pubkey>>();
403
404        let Some(blob) = declares.first() else {
405            return Err(LedgerDataBlobError::DeclareNotFound.into());
406        };
407
408        if declares.len() > 1 {
409            return Err(LedgerDataBlobError::MultipleDeclares.into());
410        }
411
412        if relevant_instructions
413            .iter()
414            .filter(|instruction| {
415                matches!(
416                    instruction.instruction,
417                    RelevantInstruction::FinalizeBlob(_)
418                )
419            })
420            .count()
421            > 1
422        {
423            return Err(LedgerDataBlobError::MultipleFinalizes.into());
424        }
425
426        Ok(get_blob_data_from_instructions(
427            &relevant_instructions,
428            blober,
429            *blob,
430        )?)
431    }
432
433    /// Fetches all blobs finalized in a given slot from the ledger.
434    pub async fn get_ledger_blobs(
435        &self,
436        slot: u64,
437        namespace: &str,
438        payer_pubkey: Option<Pubkey>,
439        lookback_slots: Option<u64>,
440    ) -> BloberClientResult<Vec<Vec<u8>>> {
441        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
442        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
443
444        let block_config = RpcBlockConfig {
445            commitment: Some(self.rpc_client.commitment()),
446            encoding: Some(UiTransactionEncoding::Base58),
447            ..Default::default()
448        };
449        let block = self
450            .rpc_client
451            .get_block_with_config(slot, block_config)
452            .await?;
453
454        let Some(transactions) = block.transactions else {
455            // If there are no transactions in the block, that means there are no blobs to fetch.
456            return Ok(Vec::new());
457        };
458
459        let relevant_instructions = extract_relevant_instructions(
460            &transactions
461                .iter()
462                .filter_map(|tx| match &tx.meta {
463                    Some(meta) if meta.status.is_err() => None,
464                    _ => tx.transaction.decode(),
465                })
466                .collect::<Vec<_>>(),
467        );
468        let finalized_blobs = relevant_instructions
469            .iter()
470            .filter_map(|instruction| {
471                (instruction.blober == blober
472                    && matches!(
473                        instruction.instruction,
474                        RelevantInstruction::FinalizeBlob(_)
475                    ))
476                .then_some(instruction.blob)
477            })
478            .collect::<HashSet<Pubkey>>();
479
480        let mut relevant_instructions_map = HashMap::new();
481        filter_relevant_instructions(
482            relevant_instructions,
483            &finalized_blobs,
484            &mut relevant_instructions_map,
485        );
486
487        let mut blobs = HashMap::with_capacity(finalized_blobs.len());
488        for blob in &finalized_blobs {
489            let instructions = relevant_instructions_map
490                .get(blob)
491                .expect("This should never happen since we at least have the finalize instruction");
492
493            if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob) {
494                blobs.insert(blob, blob_data);
495            }
496        }
497
498        // If all blobs are found, return them.
499        if blobs.len() == finalized_blobs.len() {
500            return Ok(blobs.values().cloned().collect());
501        }
502
503        let lookback_slots = lookback_slots.unwrap_or(DEFAULT_LOOKBACK_SLOTS);
504
505        let block_slots = self
506            .rpc_client
507            .get_blocks_with_commitment(
508                slot - lookback_slots,
509                Some(slot - 1),
510                self.rpc_client.commitment(),
511            )
512            .await?;
513
514        for slot in block_slots.into_iter().rev() {
515            let block = self
516                .rpc_client
517                .get_block_with_config(slot, block_config)
518                .await?;
519            let Some(transactions) = block.transactions else {
520                // If there are no transactions in the block, go to the next block.
521                continue;
522            };
523            let new_relevant_instructions = extract_relevant_instructions(
524                &transactions
525                    .iter()
526                    .filter_map(|tx| match &tx.meta {
527                        Some(meta) if meta.status.is_err() => None,
528                        _ => tx.transaction.decode(),
529                    })
530                    .collect::<Vec<_>>(),
531            );
532            filter_relevant_instructions(
533                new_relevant_instructions,
534                &finalized_blobs,
535                &mut relevant_instructions_map,
536            );
537            for blob in &finalized_blobs {
538                if blobs.contains_key(blob) {
539                    continue;
540                }
541                let instructions = relevant_instructions_map.get(blob).expect(
542                    "This should never happen since we at least have the finalize instruction",
543                );
544                println!("total {}", instructions.len());
545
546                if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob)
547                {
548                    blobs.insert(blob, blob_data);
549                }
550            }
551            if blobs.len() == finalized_blobs.len() {
552                break;
553            }
554        }
555
556        Ok(blobs.values().cloned().collect())
557    }
558
559    /// Fetches all blobs for a given slot from the [`IndexerRpcClient`].
560    pub async fn get_blobs(
561        &self,
562        slot: u64,
563        namespace: &str,
564        payer_pubkey: Option<Pubkey>,
565    ) -> BloberClientResult<Vec<Vec<u8>>> {
566        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
567        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
568
569        loop {
570            let blobs = self
571                .indexer()
572                .get_blobs(blober, slot)
573                .await
574                .map_err(|e| IndexerError::Blobs(slot, e.to_string()))?;
575            if let Some(blobs) = blobs {
576                return Ok(blobs);
577            }
578            tokio::time::sleep(Duration::from_millis(100)).await;
579        }
580    }
581
582    /// Fetches blobs for a given [`BlobsByBlober`] from the [`IndexerRpcClient`].
583    pub async fn get_blobs_by_blober(
584        &self,
585        blober_blobs: BlobsByBlober,
586    ) -> BloberClientResult<Vec<Vec<u8>>> {
587        let blober = blober_blobs.blober;
588
589        self.indexer()
590            .get_blobs_by_blober(blober_blobs)
591            .await
592            .map_err(|e| IndexerError::BlobsForBlober(blober.to_string(), e.to_string()).into())
593    }
594
595    /// Fetches blobs for a given [`BlobsByPayer`] from the [`IndexerRpcClient`].
596    pub async fn get_blobs_by_payer(
597        &self,
598        payer_blobs: BlobsByPayer,
599    ) -> BloberClientResult<Vec<Vec<u8>>> {
600        let payer = payer_blobs.payer;
601
602        self.indexer()
603            .get_blobs_by_payer(payer_blobs)
604            .await
605            .map_err(|e| IndexerError::BlobsForPayer(payer.to_string(), e.to_string()).into())
606    }
607
608    /// Fetches compound proof for a given slot from the [`IndexerRpcClient`].
609    pub async fn get_slot_proof(
610        &self,
611        slot: u64,
612        namespace: &str,
613        payer_pubkey: Option<Pubkey>,
614    ) -> BloberClientResult<CompoundProof> {
615        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
616        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
617
618        loop {
619            let proof = self
620                .indexer()
621                .get_proof(blober, slot)
622                .await
623                .map_err(|e| IndexerError::Proof(slot, e.to_string()))?;
624            if let Some(proofs) = proof {
625                return Ok(proofs);
626            }
627            tokio::time::sleep(Duration::from_millis(100)).await;
628        }
629    }
630
631    /// Fetches compound proof for a given blob PDA [`Pubkey`] from the [`IndexerRpcClient`].
632    pub async fn get_blob_proof(&self, blob: Pubkey) -> BloberClientResult<Option<CompoundProof>> {
633        self.indexer()
634            .get_proof_for_blob(blob)
635            .await
636            .map_err(|e| IndexerError::ProofForBlob(blob.to_string(), e.to_string()).into())
637    }
638
639    /// Fetches blob messages for a given slot
640    /// Returns a tuple of ([`Pubkey`], [`VersionedMessage`]) where the Pubkey is the address of
641    /// the [`blober::state::blob::Blob`] account and the VersionedMessage is the message that
642    /// included the [`blober::instruction::FinalizeBlob`] instruction.
643    pub async fn get_blob_messages(
644        &self,
645        slot: u64,
646        namespace: &str,
647        payer_pubkey: Option<Pubkey>,
648    ) -> BloberClientResult<Vec<(Pubkey, VersionedMessage)>> {
649        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
650        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
651
652        let block: EncodedConfirmedBlock = self
653            .rpc_client
654            .get_block_with_config(
655                slot,
656                RpcBlockConfig {
657                    commitment: Some(self.rpc_client.commitment()),
658                    encoding: Some(UiTransactionEncoding::Base58),
659                    ..Default::default()
660                },
661            )
662            .await?
663            .into();
664
665        let finalized = block
666            .transactions
667            .iter()
668            .filter_map(|tx| match &tx.meta {
669                Some(meta) if meta.status.is_err() => None,
670                _ => tx.transaction.decode(),
671            })
672            .filter_map(|tx| {
673                let instructions = tx
674                    .message
675                    .instructions()
676                    .iter()
677                    .filter_map(|compiled_instruction| {
678                        Some(RelevantInstructionWithAccounts {
679                            blob: get_account_at_index(&tx, compiled_instruction, 0)?,
680                            blober: get_account_at_index(&tx, compiled_instruction, 1)?,
681                            instruction: RelevantInstruction::try_from_slice(compiled_instruction)?,
682                        })
683                    })
684                    .filter(|instruction| {
685                        instruction.blober == blober
686                            && matches!(
687                                instruction.instruction,
688                                RelevantInstruction::FinalizeBlob(_)
689                            )
690                    })
691                    .collect::<Vec<_>>();
692
693                instructions.is_empty().then_some(
694                    instructions
695                        .iter()
696                        .map(|instruction| (instruction.blob, tx.message.clone()))
697                        .collect::<Vec<_>>(),
698                )
699            })
700            .flatten()
701            .collect::<Vec<_>>();
702
703        Ok(finalized)
704    }
705}