nitro_da_client/
blober_client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    str::FromStr,
4    sync::Arc,
5    time::Duration,
6};
7
8use anchor_lang::{Discriminator, Space};
9use blober_client_builder::{IsSet, IsUnset, SetHeliusFeeEstimate, SetIndexerClient};
10use bon::Builder;
11use futures::StreamExt;
12use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
13use nitro_da_blober::{
14    CHUNK_SIZE, COMPOUND_DECLARE_TX_SIZE, COMPOUND_TX_SIZE, find_blob_address, find_blober_address,
15    instruction::{Close, DeclareBlob, DiscardBlob, FinalizeBlob, Initialize, InsertChunk},
16    state::blober::Blober,
17};
18use nitro_da_indexer_api::{
19    CompoundProof, IndexerRpcClient, RelevantInstruction, RelevantInstructionWithAccounts,
20    extract_relevant_instructions, get_account_at_index,
21};
22use solana_cli_config::Config;
23use solana_client::rpc_config::RpcTransactionConfig;
24use solana_rpc_client::nonblocking::rpc_client::RpcClient;
25use solana_rpc_client_api::config::RpcBlockConfig;
26use solana_sdk::{
27    commitment_config::CommitmentConfig,
28    message::VersionedMessage,
29    pubkey::Pubkey,
30    signature::{Keypair, Signature},
31    signer::Signer,
32};
33use solana_transaction_status::{EncodedConfirmedBlock, UiTransactionEncoding};
34use tracing::{Instrument, Span, info_span};
35
36use crate::{
37    BloberClientError, BloberClientResult, LedgerDataBlobError,
38    batch_client::{BatchClient, SuccessfulTransaction},
39    constants::{DEFAULT_CONCURRENCY, DEFAULT_LOOKBACK_SLOTS},
40    fees::{Fee, FeeStrategy, Lamports, Priority},
41    helpers::{
42        check_outcomes, filter_relevant_instructions, get_blob_data_from_instructions,
43        get_unique_timestamp,
44    },
45    tx::{Compound, CompoundDeclare, CompoundFinalize, MessageArguments, MessageBuilder},
46    types::{IndexerError, TransactionType, UploadBlobError},
47};
48
49#[derive(Builder, Clone)]
50pub struct BloberClient {
51    #[builder(getter(name = get_payer, vis = ""))]
52    pub(crate) payer: Arc<Keypair>,
53    pub(crate) program_id: Pubkey,
54    pub(crate) rpc_client: Arc<RpcClient>,
55    pub(crate) batch_client: BatchClient,
56    // Optional for the sake of testing, because in some tests indexer client is not used
57    pub(crate) indexer_client: Option<Arc<WsClient>>,
58    #[builder(default = false)]
59    pub(crate) helius_fee_estimate: bool,
60}
61
62impl<State: blober_client_builder::State> BloberClientBuilder<State> {
63    /// Adds an indexer client to the builder based on the given indexer URL.
64    ///
65    /// # Example
66    ///
67    /// ```rust
68    /// use blober_client::BloberClient;
69    ///
70    /// let builder_with_indexer = BloberClient::builder()
71    ///     .indexer_from_url("ws://localhost:8080")
72    ///     .await?;
73    /// ```
74    pub async fn indexer_from_url(
75        self,
76        indexer_url: &str,
77    ) -> BloberClientResult<BloberClientBuilder<SetIndexerClient<State>>>
78    where
79        State::IndexerClient: IsUnset,
80    {
81        let indexer_client = WsClientBuilder::new().build(indexer_url).await?;
82        Ok(self.indexer_client(Arc::new(indexer_client)))
83    }
84
85    /// Builds a new `BloberClient` with an RPC client and a batch client built from the given
86    /// Solana cli [`Config`].
87    ///
88    /// # Example
89    ///
90    /// ```rust
91    /// use std::sync::Arc;
92    ///
93    /// use blober_client::{BloberClient};
94    /// use solana_cli_config::Config;
95    /// use solana_sdk::{pubkey::Pubkey, signature::Keypair};
96    ///
97    /// let payer = Arc::new(Keypair::new());
98    /// let program_id = Pubkey::new_unique();
99    /// let solana_config = Config::default();
100    /// let client = BloberClient::builder()
101    ///     .payer(payer)
102    ///     .program_id(program_id)
103    ///     .build_with_config(solana_config)
104    ///     .await?;
105    /// ```
106    pub async fn build_with_config(self, solana_config: Config) -> BloberClientResult<BloberClient>
107    where
108        State::Payer: IsSet,
109        State::ProgramId: IsSet,
110        State::RpcClient: IsUnset,
111        State::BatchClient: IsUnset,
112    {
113        let rpc_client = Arc::new(RpcClient::new_with_commitment(
114            solana_config.json_rpc_url.clone(),
115            CommitmentConfig::from_str(&solana_config.commitment)?,
116        ));
117        let payer = self.get_payer().clone();
118        Ok(self
119            .rpc_client(rpc_client.clone())
120            .batch_client(BatchClient::new(rpc_client.clone(), vec![payer.clone()]).await?)
121            .build())
122    }
123
124    pub fn with_helius_fee_estimate(self) -> BloberClientBuilder<SetHeliusFeeEstimate<State>>
125    where
126        State::HeliusFeeEstimate: IsUnset,
127    {
128        self.helius_fee_estimate(true)
129    }
130}
131
132impl BloberClient {
133    /// Returns the underlaying [`RpcClient`].
134    pub fn rpc_client(&self) -> Arc<RpcClient> {
135        self.rpc_client.clone()
136    }
137
138    /// Returns the transaction payer [`Keypair`].
139    pub fn payer(&self) -> Arc<Keypair> {
140        self.payer.clone()
141    }
142
143    /// Initializes a new [`Blober`] PDA account.
144    pub async fn initialize_blober(
145        &self,
146        fee_strategy: FeeStrategy,
147        namespace: String,
148        timeout: Option<Duration>,
149    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
150        let blober = find_blober_address(self.program_id, self.payer.pubkey(), &namespace);
151
152        let fee_strategy = self
153            .convert_fee_strategy_to_fixed(
154                fee_strategy,
155                &[blober],
156                TransactionType::InitializeBlober,
157            )
158            .in_current_span()
159            .await?;
160
161        let msg = Initialize::build_message(MessageArguments::new(
162            self.program_id,
163            blober,
164            &self.payer,
165            self.rpc_client.clone(),
166            fee_strategy,
167            self.helius_fee_estimate,
168            (namespace, blober),
169        ))
170        .await
171        .expect("infallible with a fixed fee strategy");
172
173        let span = info_span!(parent: Span::current(), "initialize_blober");
174        Ok(check_outcomes(
175            self.batch_client
176                .send(vec![(TransactionType::InitializeBlober, msg)], timeout)
177                .instrument(span)
178                .await,
179        )
180        .map_err(UploadBlobError::InitializeBlober)?)
181    }
182
183    /// Closes a [`Blober`] PDA account.
184    pub async fn close_blober(
185        &self,
186        fee_strategy: FeeStrategy,
187        blober: Pubkey,
188        timeout: Option<Duration>,
189    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
190        let fee_strategy = self
191            .convert_fee_strategy_to_fixed(fee_strategy, &[blober], TransactionType::CloseBlober)
192            .in_current_span()
193            .await?;
194
195        let msg = Close::build_message(MessageArguments::new(
196            self.program_id,
197            blober,
198            &self.payer,
199            self.rpc_client.clone(),
200            fee_strategy,
201            self.helius_fee_estimate,
202            (),
203        ))
204        .await
205        .expect("infallible with a fixed fee strategy");
206
207        let span = info_span!(parent: Span::current(), "close_blober");
208        Ok(check_outcomes(
209            self.batch_client
210                .send(vec![(TransactionType::CloseBlober, msg)], timeout)
211                .instrument(span)
212                .await,
213        )
214        .map_err(UploadBlobError::CloseBlober)?)
215    }
216
217    /// Uploads a blob of data with the given [`Blober`] PDA account.
218    /// Under the hood it creates a new [`blober::state::blob::Blob`] PDA which stores a incremental hash of the chunks
219    /// from the blob data. On completion of the blob upload, the blob PDA gets closed sending it's
220    /// funds back to the [`BloberClient::payer`].
221    /// If the blob upload fails, the blob PDA gets discarded and the funds also get sent to the
222    /// [`BloberClient::payer`].
223    pub async fn upload_blob(
224        &self,
225        blob_data: &[u8],
226        fee_strategy: FeeStrategy,
227        blober: Pubkey,
228        timeout: Option<Duration>,
229    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
230        let timestamp = get_unique_timestamp();
231
232        let blob = find_blob_address(
233            self.program_id,
234            self.payer.pubkey(),
235            blober,
236            timestamp,
237            blob_data.len(),
238        );
239
240        let upload_messages = self
241            .generate_messages(blob, timestamp, blob_data, fee_strategy, blober)
242            .await?;
243
244        let res = self
245            .do_upload(upload_messages, timeout)
246            .in_current_span()
247            .await;
248
249        if let Err(BloberClientError::UploadBlob(UploadBlobError::DeclareBlob(_))) = res {
250            self.discard_blob(fee_strategy, blob, blober, timeout).await
251        } else {
252            res
253        }
254    }
255
256    /// Discards a [`blober::state::blob::Blob`] PDA account registered with the provided
257    /// [`Blober`] PDA account.
258    pub async fn discard_blob(
259        &self,
260        fee_strategy: FeeStrategy,
261        blob: Pubkey,
262        blober: Pubkey,
263        timeout: Option<Duration>,
264    ) -> BloberClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
265        let fee_strategy = self
266            .convert_fee_strategy_to_fixed(fee_strategy, &[blob], TransactionType::DiscardBlob)
267            .in_current_span()
268            .await?;
269
270        let msg = DiscardBlob::build_message(MessageArguments::new(
271            self.program_id,
272            blober,
273            &self.payer,
274            self.rpc_client.clone(),
275            fee_strategy,
276            self.helius_fee_estimate,
277            blob,
278        ))
279        .in_current_span()
280        .await
281        .expect("infallible with a fixed fee strategy");
282
283        let span = info_span!(parent: Span::current(), "discard_blob");
284
285        Ok(check_outcomes(
286            self.batch_client
287                .send(vec![(TransactionType::DiscardBlob, msg)], timeout)
288                .instrument(span)
289                .await,
290        )
291        .map_err(UploadBlobError::DiscardBlob)?)
292    }
293
294    /// Estimates fees for uploading a blob of the size `blob_size` with the given `priority`.
295    /// This whole functions is basically a simulation that doesn't run anything. Instead of executing transactions,
296    /// it just sums the expected fees and number of signatures.
297    ///
298    /// The [`blober::state::blob::Blob`] PDA account is always newly created, so for estimating compute fees
299    /// we don't even need the real keypair, any unused pubkey will do.
300    pub async fn estimate_fees(
301        &self,
302        blob_size: usize,
303        blober: Pubkey,
304        priority: Priority,
305    ) -> BloberClientResult<Fee> {
306        let prioritization_fee_rate = priority
307            .get_priority_fee_estimate(
308                &self.rpc_client,
309                &[Pubkey::new_unique(), blober, self.payer.pubkey()],
310                self.helius_fee_estimate,
311            )
312            .await?;
313
314        let num_chunks = blob_size.div_ceil(CHUNK_SIZE as usize) as u16;
315
316        let (compute_unit_limit, num_signatures) = if blob_size < COMPOUND_TX_SIZE as usize {
317            (Compound::COMPUTE_UNIT_LIMIT, Compound::NUM_SIGNATURES)
318        } else if blob_size < COMPOUND_DECLARE_TX_SIZE as usize {
319            (
320                CompoundDeclare::COMPUTE_UNIT_LIMIT + FinalizeBlob::COMPUTE_UNIT_LIMIT,
321                CompoundDeclare::NUM_SIGNATURES + FinalizeBlob::NUM_SIGNATURES,
322            )
323        } else {
324            (
325                DeclareBlob::COMPUTE_UNIT_LIMIT
326                    + (num_chunks - 1) as u32 * InsertChunk::COMPUTE_UNIT_LIMIT
327                    + CompoundFinalize::COMPUTE_UNIT_LIMIT,
328                DeclareBlob::NUM_SIGNATURES
329                    + (num_chunks - 1) * InsertChunk::NUM_SIGNATURES
330                    + CompoundFinalize::NUM_SIGNATURES,
331            )
332        };
333
334        // The base Solana transaction fee = 5000.
335        // Reference link: https://solana.com/docs/core/fees#:~:text=While%20transaction%20fees%20are%20paid,of%205k%20lamports%20per%20signature.
336        let price_per_signature = Lamports::new(5000);
337
338        let blob_account_size = Blober::DISCRIMINATOR.len() + Blober::INIT_SPACE;
339
340        Ok(Fee {
341            num_signatures,
342            price_per_signature,
343            compute_unit_limit,
344            prioritization_fee_rate,
345            blob_account_size,
346        })
347    }
348
349    /// Returns the raw blob data from the ledger for the given signatures.
350    pub async fn get_ledger_blobs_from_signatures(
351        &self,
352        blober: Pubkey,
353        signatures: Vec<Signature>,
354    ) -> BloberClientResult<Vec<u8>> {
355        let relevant_transactions = futures::stream::iter(signatures)
356            .map(|signature| async move {
357                self.rpc_client
358                    .get_transaction_with_config(
359                        &signature,
360                        RpcTransactionConfig {
361                            commitment: Some(self.rpc_client.commitment()),
362                            encoding: Some(UiTransactionEncoding::Base58),
363                            ..Default::default()
364                        },
365                    )
366                    .await
367            })
368            .buffer_unordered(DEFAULT_CONCURRENCY)
369            .collect::<Vec<_>>()
370            .await
371            .into_iter()
372            .collect::<Result<Vec<_>, _>>()?;
373
374        let relevant_instructions = extract_relevant_instructions(
375            &relevant_transactions
376                .iter()
377                .filter_map(|encoded| match &encoded.transaction.meta {
378                    Some(meta) if meta.status.is_err() => None,
379                    _ => encoded.transaction.transaction.decode(),
380                })
381                .collect::<Vec<_>>(),
382        );
383
384        let declares = relevant_instructions
385            .iter()
386            .filter_map(|instruction| {
387                (instruction.blober == blober
388                    && matches!(instruction.instruction, RelevantInstruction::DeclareBlob(_)))
389                .then_some(instruction.blob)
390            })
391            .collect::<Vec<Pubkey>>();
392
393        let Some(blob) = declares.first() else {
394            return Err(LedgerDataBlobError::DeclareNotFound.into());
395        };
396
397        if declares.len() > 1 {
398            return Err(LedgerDataBlobError::MultipleDeclares.into());
399        }
400
401        if relevant_instructions
402            .iter()
403            .filter(|instruction| {
404                matches!(
405                    instruction.instruction,
406                    RelevantInstruction::FinalizeBlob(_)
407                )
408            })
409            .count()
410            > 1
411        {
412            return Err(LedgerDataBlobError::MultipleFinalizes.into());
413        }
414
415        Ok(get_blob_data_from_instructions(
416            &relevant_instructions,
417            blober,
418            *blob,
419        )?)
420    }
421
422    /// Fetches all blobs finalized in a given slot from the ledger.
423    pub async fn get_ledger_blobs(
424        &self,
425        slot: u64,
426        blober: Pubkey,
427        lookback_slots: Option<u64>,
428    ) -> BloberClientResult<Vec<Vec<u8>>> {
429        let block_config = RpcBlockConfig {
430            commitment: Some(self.rpc_client.commitment()),
431            encoding: Some(UiTransactionEncoding::Base58),
432            ..Default::default()
433        };
434        let block = self
435            .rpc_client
436            .get_block_with_config(slot, block_config)
437            .await?;
438
439        let Some(transactions) = block.transactions else {
440            // If there are no transactions in the block, that means there are no blobs to fetch.
441            return Ok(Vec::new());
442        };
443
444        let relevant_instructions = extract_relevant_instructions(
445            &transactions
446                .iter()
447                .filter_map(|tx| match &tx.meta {
448                    Some(meta) if meta.status.is_err() => None,
449                    _ => tx.transaction.decode(),
450                })
451                .collect::<Vec<_>>(),
452        );
453        let finalized_blobs = relevant_instructions
454            .iter()
455            .filter_map(|instruction| {
456                (instruction.blober == blober
457                    && matches!(
458                        instruction.instruction,
459                        RelevantInstruction::FinalizeBlob(_)
460                    ))
461                .then_some(instruction.blob)
462            })
463            .collect::<HashSet<Pubkey>>();
464
465        let mut relevant_instructions_map = HashMap::new();
466        filter_relevant_instructions(
467            relevant_instructions,
468            &finalized_blobs,
469            &mut relevant_instructions_map,
470        );
471
472        let mut blobs = HashMap::with_capacity(finalized_blobs.len());
473        for blob in &finalized_blobs {
474            let instructions = relevant_instructions_map
475                .get(blob)
476                .expect("This should never happen since we at least have the finalize instruction");
477
478            if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob) {
479                blobs.insert(blob, blob_data);
480            }
481        }
482
483        // If all blobs are found, return them.
484        if blobs.len() == finalized_blobs.len() {
485            return Ok(blobs.values().cloned().collect());
486        }
487
488        let lookback_slots = lookback_slots.unwrap_or(DEFAULT_LOOKBACK_SLOTS);
489
490        let block_slots = self
491            .rpc_client
492            .get_blocks_with_commitment(
493                slot - lookback_slots,
494                Some(slot - 1),
495                self.rpc_client.commitment(),
496            )
497            .await?;
498
499        for slot in block_slots.into_iter().rev() {
500            let block = self
501                .rpc_client
502                .get_block_with_config(slot, block_config)
503                .await?;
504            let Some(transactions) = block.transactions else {
505                // If there are no transactions in the block, go to the next block.
506                continue;
507            };
508            let new_relevant_instructions = extract_relevant_instructions(
509                &transactions
510                    .iter()
511                    .filter_map(|tx| match &tx.meta {
512                        Some(meta) if meta.status.is_err() => None,
513                        _ => tx.transaction.decode(),
514                    })
515                    .collect::<Vec<_>>(),
516            );
517            filter_relevant_instructions(
518                new_relevant_instructions,
519                &finalized_blobs,
520                &mut relevant_instructions_map,
521            );
522            for blob in &finalized_blobs {
523                if blobs.contains_key(blob) {
524                    continue;
525                }
526                let instructions = relevant_instructions_map.get(blob).expect(
527                    "This should never happen since we at least have the finalize instruction",
528                );
529                println!("total {}", instructions.len());
530
531                if let Ok(blob_data) = get_blob_data_from_instructions(instructions, blober, *blob)
532                {
533                    blobs.insert(blob, blob_data);
534                }
535            }
536            if blobs.len() == finalized_blobs.len() {
537                break;
538            }
539        }
540
541        Ok(blobs.values().cloned().collect())
542    }
543
544    /// Fetches all blobs for a given slot from the [`IndexerRpcClient`].
545    pub async fn get_blobs(&self, slot: u64, blober: Pubkey) -> BloberClientResult<Vec<Vec<u8>>> {
546        loop {
547            let blobs = self
548                .indexer()
549                .get_blobs(blober, slot)
550                .await
551                .map_err(|e| IndexerError::Blobs(slot, e.to_string()))?;
552            if let Some(blobs) = blobs {
553                return Ok(blobs);
554            }
555            tokio::time::sleep(Duration::from_millis(100)).await;
556        }
557    }
558
559    /// Fetches compound proof for a given slot from the [`IndexerRpcClient`].
560    pub async fn get_slot_proof(
561        &self,
562        slot: u64,
563        blober: Pubkey,
564    ) -> BloberClientResult<CompoundProof> {
565        loop {
566            let proof = self
567                .indexer()
568                .get_proof(blober, slot)
569                .await
570                .map_err(|e| IndexerError::Proof(slot, e.to_string()))?;
571            if let Some(proofs) = proof {
572                return Ok(proofs);
573            }
574            tokio::time::sleep(Duration::from_millis(100)).await;
575        }
576    }
577
578    /// Fetches blob messages for a given slot
579    /// Returns a tuple of ([`Pubkey`], [`VersionedMessage`]) where the Pubkey is the address of
580    /// the [`blober::state::blob::Blob`] account and the VersionedMessage is the message that
581    /// included the [`blober::instruction::FinalizeBlob`] instruction.
582    pub async fn get_blob_messages(
583        &self,
584        slot: u64,
585        blober: Pubkey,
586    ) -> BloberClientResult<Vec<(Pubkey, VersionedMessage)>> {
587        let block: EncodedConfirmedBlock = self
588            .rpc_client
589            .get_block_with_config(
590                slot,
591                RpcBlockConfig {
592                    commitment: Some(self.rpc_client.commitment()),
593                    encoding: Some(UiTransactionEncoding::Base58),
594                    ..Default::default()
595                },
596            )
597            .await?
598            .into();
599
600        let finalized = block
601            .transactions
602            .iter()
603            .filter_map(|tx| match &tx.meta {
604                Some(meta) if meta.status.is_err() => None,
605                _ => tx.transaction.decode(),
606            })
607            .filter_map(|tx| {
608                let instructions = tx
609                    .message
610                    .instructions()
611                    .iter()
612                    .filter_map(|compiled_instruction| {
613                        Some(RelevantInstructionWithAccounts {
614                            blob: get_account_at_index(&tx, compiled_instruction, 0)?,
615                            blober: get_account_at_index(&tx, compiled_instruction, 1)?,
616                            instruction: RelevantInstruction::try_from_slice(compiled_instruction)?,
617                        })
618                    })
619                    .filter(|instruction| {
620                        instruction.blober == blober
621                            && matches!(
622                                instruction.instruction,
623                                RelevantInstruction::FinalizeBlob(_)
624                            )
625                    })
626                    .collect::<Vec<_>>();
627
628                instructions.is_empty().then_some(
629                    instructions
630                        .iter()
631                        .map(|instruction| (instruction.blob, tx.message.clone()))
632                        .collect::<Vec<_>>(),
633                )
634            })
635            .flatten()
636            .collect::<Vec<_>>();
637
638        Ok(finalized)
639    }
640}