nitro_da_client/
blober_client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4    sync::Arc,
5    time::Duration,
6};
7
8use anchor_lang::{Discriminator, Space};
9use blober_client_builder::{IsSet, IsUnset, SetHeliusFeeEstimate, SetIndexerClient};
10use bon::Builder;
11use futures::StreamExt;
12use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
13use nitro_da_blober::{
14    CHUNK_SIZE, COMPOUND_DECLARE_TX_SIZE, COMPOUND_TX_SIZE, find_blob_address, find_blober_address,
15    instruction::{Close, DeclareBlob, DiscardBlob, FinalizeBlob, Initialize, InsertChunk},
16    state::blober::Blober,
17};
18use nitro_da_indexer_api::{
19    CompoundProof, IndexerRpcClient, RelevantInstruction, RelevantInstructionWithAccounts,
20    extract_relevant_instructions, get_account_at_index,
21};
22use solana_cli_config::Config;
23use solana_client::rpc_config::RpcTransactionConfig;
24use solana_rpc_client::nonblocking::rpc_client::RpcClient;
25use solana_rpc_client_api::config::RpcBlockConfig;
26use solana_sdk::{
27    commitment_config::CommitmentConfig,
28    message::VersionedMessage,
29    pubkey::Pubkey,
30    signature::{Keypair, Signature},
31    signer::Signer,
32};
33use solana_transaction_status::{EncodedConfirmedBlock, UiTransactionEncoding};
34use tracing::{Instrument, Span, info_span};
35
36use crate::{
37    BloberClientError, BloberClientResult, LedgerDataBlobError,
38    batch_client::{BatchClient, SuccessfulTransaction},
39    constants::{DEFAULT_CONCURRENCY, DEFAULT_LOOKBACK_SLOTS},
40    fees::{Fee, FeeStrategy, Lamports, Priority},
41    helpers::{
42        check_outcomes, filter_relevant_instructions, get_blob_data_from_instructions,
43        get_unique_timestamp,
44    },
45    tx::{Compound, CompoundDeclare, CompoundFinalize, MessageArguments, MessageBuilder},
46    types::{IndexerError, TransactionType, UploadBlobError},
47};
48
49#[derive(Builder, Clone)]
50pub struct BloberClient {
51    #[builder(getter(name = get_payer, vis = ""))]
52    pub(crate) payer: Arc<Keypair>,
53    pub(crate) program_id: Pubkey,
54    pub(crate) rpc_client: Arc<RpcClient>,
55    pub(crate) batch_client: BatchClient,
56    // Optional for the sake of testing, because in some tests indexer client is not used
57    pub(crate) indexer_client: Option<Arc<WsClient>>,
58    #[builder(default = false)]
59    pub(crate) helius_fee_estimate: bool,
60}
61
62impl<State: blober_client_builder::State> BloberClientBuilder<State> {
63    /// Adds an indexer client to the builder based on the given indexer URL.
64    ///
65    /// # Example
66    ///
67    /// ```rust
68    /// use blober_client::BloberClient;
69    ///
70    /// let builder_with_indexer = BloberClient::builder()
71    ///     .indexer_from_url("ws://localhost:8080")
72    ///     .await?;
73    /// ```
74    pub async fn indexer_from_url(
75        self,
76        indexer_url: &str,
77    ) -> BloberClientResult<BloberClientBuilder<SetIndexerClient<State>>>
78    where
79        State::IndexerClient: IsUnset,
80    {
81        let indexer_client = WsClientBuilder::new().build(indexer_url).await?;
82        Ok(self.indexer_client(Arc::new(indexer_client)))
83    }
84
85    /// Builds a new `BloberClient` with an RPC client and a batch client built from the given
86    /// Solana cli [`Config`].
87    ///
88    /// # Example
89    ///
90    /// ```rust
91    /// use std::sync::Arc;
92    ///
93    /// use blober_client::{BloberClient};
94    /// use solana_cli_config::Config;
95    /// use solana_sdk::{pubkey::Pubkey, signature::Keypair};
96    ///
97    /// let payer = Arc::new(Keypair::new());
98    /// let program_id = Pubkey::new_unique();
99    /// let solana_config = Config::default();
100    /// let client = BloberClient::builder()
101    ///     .payer(payer)
102    ///     .program_id(program_id)
103    ///     .build_with_config(solana_config)
104    ///     .await?;
105    /// ```
106    pub async fn build_with_config(self, solana_config: Config) -> BloberClientResult<BloberClient>
107    where
108        State::Payer: IsSet,
109        State::ProgramId: IsSet,
110        State::RpcClient: IsUnset,
111        State::BatchClient: IsUnset,
112    {
113        let rpc_client = Arc::new(RpcClient::new_with_commitment(
114            solana_config.json_rpc_url.clone(),
115            CommitmentConfig::from_str(&solana_config.commitment)?,
116        ));
117        let payer = self.get_payer().clone();
118        Ok(self
119            .rpc_client(rpc_client.clone())
120            .batch_client(BatchClient::new(rpc_client.clone(), vec![payer.clone()]).await?)
121            .build())
122    }
123
124    pub fn with_helius_fee_estimate(self) -> BloberClientBuilder<SetHeliusFeeEstimate<State>>
125    where
126        State::HeliusFeeEstimate: IsUnset,
127    {
128        self.helius_fee_estimate(true)
129    }
130}
131
132impl BloberClient {
133    /// Returns the underlaying [`RpcClient`].
134    pub fn rpc_client(&self) -> Arc<RpcClient> {
135        self.rpc_client.clone()
136    }
137
138    /// Returns the transaction payer [`Keypair`].
139    pub fn payer(&self) -> Arc<Keypair> {
140        self.payer.clone()
141    }
142
143    /// Initializes a new [`Blober`] PDA account.
144    pub async fn initialize_blober(
145        &self,
146        fee_strategy: FeeStrategy,
147        namespace: &str,
148        timeout: Option<Duration>,
149    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
150        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
151
152        let fee_strategy = self
153            .convert_fee_strategy_to_fixed(
154                fee_strategy,
155                &[blober],
156                TransactionType::InitializeBlober,
157            )
158            .in_current_span()
159            .await?;
160
161        let msg = Initialize::build_message(MessageArguments::new(
162            self.program_id,
163            blober,
164            &self.payer,
165            self.rpc_client.clone(),
166            fee_strategy,
167            self.helius_fee_estimate,
168            (namespace.to_owned(), blober),
169        ))
170        .await
171        .expect("infallible with a fixed fee strategy");
172
173        let span = info_span!(parent: Span::current(), "initialize_blober");
174        Ok(check_outcomes(
175            self.batch_client
176                .send(vec![(TransactionType::InitializeBlober, msg)], timeout)
177                .instrument(span)
178                .await,
179        )
180        .map_err(UploadBlobError::InitializeBlober)?)
181    }
182
183    /// Closes a [`Blober`] PDA account.
184    pub async fn close_blober(
185        &self,
186        fee_strategy: FeeStrategy,
187        namespace: &str,
188        timeout: Option<Duration>,
189    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
190        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
191
192        let fee_strategy = self
193            .convert_fee_strategy_to_fixed(fee_strategy, &[blober], TransactionType::CloseBlober)
194            .in_current_span()
195            .await?;
196
197        let msg = Close::build_message(MessageArguments::new(
198            self.program_id,
199            blober,
200            &self.payer,
201            self.rpc_client.clone(),
202            fee_strategy,
203            self.helius_fee_estimate,
204            (),
205        ))
206        .await
207        .expect("infallible with a fixed fee strategy");
208
209        let span = info_span!(parent: Span::current(), "close_blober");
210        Ok(check_outcomes(
211            self.batch_client
212                .send(vec![(TransactionType::CloseBlober, msg)], timeout)
213                .instrument(span)
214                .await,
215        )
216        .map_err(UploadBlobError::CloseBlober)?)
217    }
218
219    /// Uploads a blob of data with the given [`Blober`] PDA account.
220    /// Under the hood it creates a new [`blober::state::blob::Blob`] PDA which stores a incremental hash of the chunks
221    /// from the blob data. On completion of the blob upload, the blob PDA gets closed sending it's
222    /// funds back to the [`BloberClient::payer`].
223    /// If the blob upload fails, the blob PDA gets discarded and the funds also get sent to the
224    /// [`BloberClient::payer`].
225    pub async fn upload_blob(
226        &self,
227        blob_data: &[u8],
228        fee_strategy: FeeStrategy,
229        namespace: &str,
230        timeout: Option<Duration>,
231    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
232        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
233        let timestamp = get_unique_timestamp();
234
235        let blob = find_blob_address(
236            self.program_id,
237            self.payer.pubkey(),
238            blober,
239            timestamp,
240            blob_data.len(),
241        );
242
243        let upload_messages = self
244            .generate_messages(blob, timestamp, blob_data, fee_strategy, blober)
245            .await?;
246
247        let res = self
248            .do_upload(upload_messages, timeout)
249            .in_current_span()
250            .await;
251
252        if let Err(BloberClientError::UploadBlob(UploadBlobError::DeclareBlob(_))) = res {
253            self.discard_blob(fee_strategy, blob, namespace, timeout)
254                .await
255        } else {
256            res
257        }
258    }
259
260    /// Discards a [`blober::state::blob::Blob`] PDA account registered with the provided
261    /// [`Blober`] PDA account.
262    pub async fn discard_blob(
263        &self,
264        fee_strategy: FeeStrategy,
265        blob: Pubkey,
266        namespace: &str,
267        timeout: Option<Duration>,
268    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
269        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
270
271        let fee_strategy = self
272            .convert_fee_strategy_to_fixed(fee_strategy, &[blob], TransactionType::DiscardBlob)
273            .in_current_span()
274            .await?;
275
276        let msg = DiscardBlob::build_message(MessageArguments::new(
277            self.program_id,
278            blober,
279            &self.payer,
280            self.rpc_client.clone(),
281            fee_strategy,
282            self.helius_fee_estimate,
283            blob,
284        ))
285        .in_current_span()
286        .await
287        .expect("infallible with a fixed fee strategy");
288
289        let span = info_span!(parent: Span::current(), "discard_blob");
290
291        Ok(check_outcomes(
292            self.batch_client
293                .send(vec![(TransactionType::DiscardBlob, msg)], timeout)
294                .instrument(span)
295                .await,
296        )
297        .map_err(UploadBlobError::DiscardBlob)?)
298    }
299
300    /// Estimates fees for uploading a blob of the size `blob_size` with the given `priority`.
301    /// This whole functions is basically a simulation that doesn't run anything. Instead of executing transactions,
302    /// it just sums the expected fees and number of signatures.
303    ///
304    /// The [`blober::state::blob::Blob`] PDA account is always newly created, so for estimating compute fees
305    /// we don't even need the real keypair, any unused pubkey will do.
306    pub async fn estimate_fees(
307        &self,
308        blob_size: usize,
309        blober: Pubkey,
310        priority: Priority,
311    ) -> BloberClientResult<Fee> {
312        let prioritization_fee_rate = priority
313            .get_priority_fee_estimate(
314                &self.rpc_client,
315                &[Pubkey::new_unique(), blober, self.payer.pubkey()],
316                self.helius_fee_estimate,
317            )
318            .await?;
319
320        let num_chunks = blob_size.div_ceil(CHUNK_SIZE as usize) as u16;
321
322        let (compute_unit_limit, num_signatures) = if blob_size < COMPOUND_TX_SIZE as usize {
323            (Compound::COMPUTE_UNIT_LIMIT, Compound::NUM_SIGNATURES)
324        } else if blob_size < COMPOUND_DECLARE_TX_SIZE as usize {
325            (
326                CompoundDeclare::COMPUTE_UNIT_LIMIT + FinalizeBlob::COMPUTE_UNIT_LIMIT,
327                CompoundDeclare::NUM_SIGNATURES + FinalizeBlob::NUM_SIGNATURES,
328            )
329        } else {
330            (
331                DeclareBlob::COMPUTE_UNIT_LIMIT
332                    + (num_chunks - 1) as u32 * InsertChunk::COMPUTE_UNIT_LIMIT
333                    + CompoundFinalize::COMPUTE_UNIT_LIMIT,
334                DeclareBlob::NUM_SIGNATURES
335                    + (num_chunks - 1) * InsertChunk::NUM_SIGNATURES
336                    + CompoundFinalize::NUM_SIGNATURES,
337            )
338        };
339
340        // The base Solana transaction fee = 5000.
341        // Reference link: https://solana.com/docs/core/fees#:~:text=While%20transaction%20fees%20are%20paid,of%205k%20lamports%20per%20signature.
342        let price_per_signature = Lamports::new(5000);
343
344        let blob_account_size = Blober::DISCRIMINATOR.len() + Blober::INIT_SPACE;
345
346        Ok(Fee {
347            num_signatures,
348            price_per_signature,
349            compute_unit_limit,
350            prioritization_fee_rate,
351            blob_account_size,
352        })
353    }
354
355    /// Returns the raw blob data from the ledger for the given signatures.
356    pub async fn get_ledger_blobs_from_signatures(
357        &self,
358        namespace: &str,
359        payer_pubkey: Option<Pubkey>,
360        signatures: Vec<Signature>,
361    ) -> BloberClientResult<Vec<u8>> {
362        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
363        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
364
365        let relevant_transactions = futures::stream::iter(signatures)
366            .map(|signature| async move {
367                self.rpc_client
368                    .get_transaction_with_config(
369                        &signature,
370                        RpcTransactionConfig {
371                            commitment: Some(self.rpc_client.commitment()),
372                            encoding: Some(UiTransactionEncoding::Base58),
373                            ..Default::default()
374                        },
375                    )
376                    .await
377            })
378            .buffer_unordered(DEFAULT_CONCURRENCY)
379            .collect::<Vec<_>>()
380            .await
381            .into_iter()
382            .collect::<Result<Vec<_>, _>>()?;
383
384        let relevant_instructions = extract_relevant_instructions(
385            &relevant_transactions
386                .iter()
387                .filter_map(|encoded| match &encoded.transaction.meta {
388                    Some(meta) if meta.status.is_err() => None,
389                    _ => encoded.transaction.transaction.decode(),
390                })
391                .collect::<Vec<_>>(),
392        );
393
394        let declares = relevant_instructions
395            .iter()
396            .filter_map(|instruction| {
397                (instruction.blober == blober
398                    && matches!(instruction.instruction, RelevantInstruction::DeclareBlob(_)))
399                .then_some(instruction.blob)
400            })
401            .collect::<Vec<Pubkey>>();
402
403        let Some(blob) = declares.first() else {
404            return Err(LedgerDataBlobError::DeclareNotFound.into());
405        };
406
407        if declares.len() > 1 {
408            return Err(LedgerDataBlobError::MultipleDeclares.into());
409        }
410
411        if relevant_instructions
412            .iter()
413            .filter(|instruction| {
414                matches!(
415                    instruction.instruction,
416                    RelevantInstruction::FinalizeBlob(_)
417                )
418            })
419            .count()
420            > 1
421        {
422            return Err(LedgerDataBlobError::MultipleFinalizes.into());
423        }
424
425        Ok(get_blob_data_from_instructions(
426            &relevant_instructions,
427            blober,
428            *blob,
429        )?)
430    }
431
432    /// Fetches all blobs finalized in a given slot from the ledger.
433    pub async fn get_ledger_blobs(
434        &self,
435        slot: u64,
436        namespace: &str,
437        payer_pubkey: Option<Pubkey>,
438        lookback_slots: Option<u64>,
439    ) -> BloberClientResult<Vec<Vec<u8>>> {
440        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
441        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
442
443        let block_config = RpcBlockConfig {
444            commitment: Some(self.rpc_client.commitment()),
445            encoding: Some(UiTransactionEncoding::Base58),
446            ..Default::default()
447        };
448        let block = self
449            .rpc_client
450            .get_block_with_config(slot, block_config)
451            .await?;
452
453        let Some(transactions) = block.transactions else {
454            // If there are no transactions in the block, that means there are no blobs to fetch.
455            return Ok(Vec::new());
456        };
457
458        let relevant_instructions = extract_relevant_instructions(
459            &transactions
460                .iter()
461                .filter_map(|tx| match &tx.meta {
462                    Some(meta) if meta.status.is_err() => None,
463                    _ => tx.transaction.decode(),
464                })
465                .collect::<Vec<_>>(),
466        );
467        let finalized_blobs = relevant_instructions
468            .iter()
469            .filter_map(|instruction| {
470                (instruction.blober == blober
471                    && matches!(
472                        instruction.instruction,
473                        RelevantInstruction::FinalizeBlob(_)
474                    ))
475                .then_some(instruction.blob)
476            })
477            .collect::<HashSet<Pubkey>>();
478
479        let mut relevant_instructions_map = HashMap::new();
480        filter_relevant_instructions(
481            relevant_instructions,
482            &finalized_blobs,
483            &mut relevant_instructions_map,
484        );
485
486        let mut blobs = HashMap::with_capacity(finalized_blobs.len());
487        for blob in &finalized_blobs {
488            let instructions = relevant_instructions_map
489                .get(blob)
490                .expect("This should never happen since we at least have the finalize instruction");
491
492            if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob) {
493                blobs.insert(blob, blob_data);
494            }
495        }
496
497        // If all blobs are found, return them.
498        if blobs.len() == finalized_blobs.len() {
499            return Ok(blobs.values().cloned().collect());
500        }
501
502        let lookback_slots = lookback_slots.unwrap_or(DEFAULT_LOOKBACK_SLOTS);
503
504        let block_slots = self
505            .rpc_client
506            .get_blocks_with_commitment(
507                slot - lookback_slots,
508                Some(slot - 1),
509                self.rpc_client.commitment(),
510            )
511            .await?;
512
513        for slot in block_slots.into_iter().rev() {
514            let block = self
515                .rpc_client
516                .get_block_with_config(slot, block_config)
517                .await?;
518            let Some(transactions) = block.transactions else {
519                // If there are no transactions in the block, go to the next block.
520                continue;
521            };
522            let new_relevant_instructions = extract_relevant_instructions(
523                &transactions
524                    .iter()
525                    .filter_map(|tx| match &tx.meta {
526                        Some(meta) if meta.status.is_err() => None,
527                        _ => tx.transaction.decode(),
528                    })
529                    .collect::<Vec<_>>(),
530            );
531            filter_relevant_instructions(
532                new_relevant_instructions,
533                &finalized_blobs,
534                &mut relevant_instructions_map,
535            );
536            for blob in &finalized_blobs {
537                if blobs.contains_key(blob) {
538                    continue;
539                }
540                let instructions = relevant_instructions_map.get(blob).expect(
541                    "This should never happen since we at least have the finalize instruction",
542                );
543                println!("total {}", instructions.len());
544
545                if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob)
546                {
547                    blobs.insert(blob, blob_data);
548                }
549            }
550            if blobs.len() == finalized_blobs.len() {
551                break;
552            }
553        }
554
555        Ok(blobs.values().cloned().collect())
556    }
557
558    /// Fetches all blobs for a given slot from the [`IndexerRpcClient`].
559    pub async fn get_blobs(
560        &self,
561        slot: u64,
562        namespace: &str,
563        payer_pubkey: Option<Pubkey>,
564    ) -> BloberClientResult<Vec<Vec<u8>>> {
565        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
566        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
567
568        loop {
569            let blobs = self
570                .indexer()
571                .get_blobs(blober, slot)
572                .await
573                .map_err(|e| IndexerError::Blobs(slot, e.to_string()))?;
574            if let Some(blobs) = blobs {
575                return Ok(blobs);
576            }
577            tokio::time::sleep(Duration::from_millis(100)).await;
578        }
579    }
580
581    /// Fetches compound proof for a given slot from the [`IndexerRpcClient`].
582    pub async fn get_slot_proof(
583        &self,
584        slot: u64,
585        namespace: &str,
586        payer_pubkey: Option<Pubkey>,
587    ) -> BloberClientResult<CompoundProof> {
588        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
589        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
590
591        loop {
592            let proof = self
593                .indexer()
594                .get_proof(blober, slot)
595                .await
596                .map_err(|e| IndexerError::Proof(slot, e.to_string()))?;
597            if let Some(proofs) = proof {
598                return Ok(proofs);
599            }
600            tokio::time::sleep(Duration::from_millis(100)).await;
601        }
602    }
603
604    /// Fetches blob messages for a given slot
605    /// Returns a tuple of ([`Pubkey`], [`VersionedMessage`]) where the Pubkey is the address of
606    /// the [`blober::state::blob::Blob`] account and the VersionedMessage is the message that
607    /// included the [`blober::instruction::FinalizeBlob`] instruction.
608    pub async fn get_blob_messages(
609        &self,
610        slot: u64,
611        namespace: &str,
612        payer_pubkey: Option<Pubkey>,
613    ) -> BloberClientResult<Vec<(Pubkey, VersionedMessage)>> {
614        let payer_pubkey = payer_pubkey.unwrap_or(self.payer.pubkey());
615        let blober = find_blober_address(self.program_id, payer_pubkey, namespace);
616
617        let block: EncodedConfirmedBlock = self
618            .rpc_client
619            .get_block_with_config(
620                slot,
621                RpcBlockConfig {
622                    commitment: Some(self.rpc_client.commitment()),
623                    encoding: Some(UiTransactionEncoding::Base58),
624                    ..Default::default()
625                },
626            )
627            .await?
628            .into();
629
630        let finalized = block
631            .transactions
632            .iter()
633            .filter_map(|tx| match &tx.meta {
634                Some(meta) if meta.status.is_err() => None,
635                _ => tx.transaction.decode(),
636            })
637            .filter_map(|tx| {
638                let instructions = tx
639                    .message
640                    .instructions()
641                    .iter()
642                    .filter_map(|compiled_instruction| {
643                        Some(RelevantInstructionWithAccounts {
644                            blob: get_account_at_index(&tx, compiled_instruction, 0)?,
645                            blober: get_account_at_index(&tx, compiled_instruction, 1)?,
646                            instruction: RelevantInstruction::try_from_slice(compiled_instruction)?,
647                        })
648                    })
649                    .filter(|instruction| {
650                        instruction.blober == blober
651                            && matches!(
652                                instruction.instruction,
653                                RelevantInstruction::FinalizeBlob(_)
654                            )
655                    })
656                    .collect::<Vec<_>>();
657
658                instructions.is_empty().then_some(
659                    instructions
660                        .iter()
661                        .map(|instruction| (instruction.blob, tx.message.clone()))
662                        .collect::<Vec<_>>(),
663                )
664            })
665            .flatten()
666            .collect::<Vec<_>>();
667
668        Ok(finalized)
669    }
670}