data_anchor_client/client/
mod.rs

1use std::{sync::Arc, time::Duration};
2
3use anchor_lang::{Discriminator, Space, prelude::Pubkey};
4use bon::Builder;
5use data_anchor_blober::{
6    CHUNK_SIZE, COMPOUND_DECLARE_TX_SIZE, COMPOUND_TX_SIZE, find_blob_address, find_blober_address,
7    find_checkpoint_address, find_checkpoint_config_address,
8    instruction::{
9        Close, ConfigureCheckpoint, DeclareBlob, DiscardBlob, FinalizeBlob, Initialize, InsertChunk,
10    },
11    state::blober::Blober,
12};
13use data_anchor_utils::{
14    compression::{self, DataAnchorCompressionAsync},
15    decompress_and_decode_async, encode_and_compress_async,
16    encoding::{self, DataAnchorEncoding, Decodable, Encodable},
17};
18use futures::{StreamExt, TryStreamExt};
19use jsonrpsee::http_client::HttpClient;
20use solana_commitment_config::CommitmentConfig;
21use solana_keypair::Keypair;
22use solana_rpc_client::nonblocking::rpc_client::RpcClient;
23use solana_signer::Signer;
24use tracing::{Instrument, Span, info, info_span};
25
26use crate::{
27    DataAnchorClientError, DataAnchorClientResult,
28    batch_client::{BatchClient, SuccessfulTransaction},
29    constants::DEFAULT_CONCURRENCY,
30    fees::{Fee, FeeStrategy, Lamports},
31    helpers::{check_outcomes, get_unique_timestamp},
32    tx::{Compound, CompoundDeclare, CompoundFinalize, MessageArguments, MessageBuilder},
33    types::TransactionType,
34};
35
36mod builder;
37mod indexer_client;
38mod ledger_client;
39mod proof_client;
40
41pub use indexer_client::IndexerError;
42pub use ledger_client::ChainError;
43pub use proof_client::ProofError;
44
45/// Identifier for a blober, which can be either a combination of payer and namespace or just a pubkey.
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum BloberIdentifier {
48    Namespace(String),
49    PayerAndNamespace { payer: Pubkey, namespace: String },
50    Pubkey(Pubkey),
51}
52
53#[derive(Debug, thiserror::Error)]
54pub enum BloberIdentifierError {
55    /// Error indicating that the blober identifier is missing.
56    #[error(
57        "Missing blober identifier: either namespace, namespace and payer or blober PDA must be provided."
58    )]
59    MissingBloberIdentifier,
60}
61
62impl TryFrom<(Option<String>, Option<Pubkey>)> for BloberIdentifier {
63    type Error = BloberIdentifierError;
64
65    fn try_from(
66        (namespace, blober_pda): (Option<String>, Option<Pubkey>),
67    ) -> Result<Self, Self::Error> {
68        match (namespace, blober_pda) {
69            (Some(namespace), None) => Ok(namespace.into()),
70            (None, Some(pubkey)) => Ok(pubkey.into()),
71            (Some(namespace), Some(payer)) => Ok((payer, namespace).into()),
72            _ => Err(BloberIdentifierError::MissingBloberIdentifier),
73        }
74    }
75}
76
77impl From<String> for BloberIdentifier {
78    fn from(namespace: String) -> Self {
79        BloberIdentifier::Namespace(namespace)
80    }
81}
82
83impl From<(Pubkey, String)> for BloberIdentifier {
84    fn from((payer, namespace): (Pubkey, String)) -> Self {
85        BloberIdentifier::PayerAndNamespace { payer, namespace }
86    }
87}
88
89impl From<Pubkey> for BloberIdentifier {
90    fn from(pubkey: Pubkey) -> Self {
91        BloberIdentifier::Pubkey(pubkey)
92    }
93}
94
95impl BloberIdentifier {
96    /// Converts the [`BloberIdentifier`] to a [`Pubkey`] representing the blober address.
97    pub fn to_blober_address(&self, program_id: Pubkey, payer: Pubkey) -> Pubkey {
98        match self {
99            BloberIdentifier::Namespace(namespace) => {
100                find_blober_address(program_id, payer, namespace)
101            }
102            BloberIdentifier::PayerAndNamespace { payer, namespace } => {
103                find_blober_address(program_id, *payer, namespace)
104            }
105            BloberIdentifier::Pubkey(pubkey) => *pubkey,
106        }
107    }
108
109    /// Returns the namespace of the blober identifier.
110    pub fn namespace(&self) -> Option<&str> {
111        match self {
112            BloberIdentifier::Namespace(namespace) => Some(namespace),
113            BloberIdentifier::PayerAndNamespace { namespace, .. } => Some(namespace),
114            BloberIdentifier::Pubkey(_) => None,
115        }
116    }
117}
118
119#[derive(Builder, Clone)]
120pub struct DataAnchorClient<Encoding = encoding::Default, Compression = compression::Default>
121where
122    Encoding: DataAnchorEncoding + Default,
123    Compression: DataAnchorCompressionAsync,
124{
125    #[builder(getter(name = get_payer, vis = ""))]
126    pub(crate) payer: Arc<Keypair>,
127    #[builder(default = data_anchor_blober::id())]
128    pub(crate) program_id: Pubkey,
129    pub(crate) rpc_client: Arc<RpcClient>,
130    pub(crate) batch_client: BatchClient,
131    pub(crate) indexer_client: Option<Arc<HttpClient>>,
132    pub(crate) proof_client: Option<Arc<HttpClient>>,
133    #[builder(default)]
134    pub(crate) encoding: Encoding,
135    #[builder(default)]
136    pub(crate) compression: Compression,
137}
138
139impl<Encoding, Compression> DataAnchorClient<Encoding, Compression>
140where
141    Encoding: DataAnchorEncoding + Default,
142    Compression: DataAnchorCompressionAsync,
143{
144    /// Returns the underlaying [`RpcClient`].
145    pub fn rpc_client(&self) -> Arc<RpcClient> {
146        self.rpc_client.clone()
147    }
148
149    /// Returns the transaction payer [`Keypair`].
150    pub fn payer(&self) -> Arc<Keypair> {
151        self.payer.clone()
152    }
153
154    fn in_mock_env(&self) -> bool {
155        self.rpc_client.url().starts_with("MockSender")
156    }
157
158    async fn check_account_exists(&self, account: Pubkey) -> DataAnchorClientResult<bool> {
159        Ok(self
160            .rpc_client
161            .get_account_with_commitment(&account, CommitmentConfig::confirmed())
162            .await
163            .map(|res| res.value.is_some())?)
164    }
165
166    async fn require_balance(&self, cost: Lamports) -> DataAnchorClientResult {
167        let balance = self
168            .rpc_client
169            .get_balance_with_commitment(&self.payer.pubkey(), CommitmentConfig::confirmed())
170            .await
171            .map(|r| r.value)?;
172        let cost_u64 = cost.into_inner() as u64;
173        if balance < cost_u64 {
174            return Err(ChainError::InsufficientBalance(cost_u64, balance).into());
175        }
176        Ok(())
177    }
178
179    pub async fn encode_and_compress<T>(&self, data: &T) -> DataAnchorClientResult<Vec<u8>>
180    where
181        T: Encodable,
182    {
183        Ok(encode_and_compress_async(&self.encoding, &self.compression, data).await?)
184    }
185
186    pub async fn decompress_and_decode<T>(&self, bytes: &[u8]) -> DataAnchorClientResult<T>
187    where
188        T: Decodable,
189    {
190        Ok(decompress_and_decode_async(&self.encoding, &self.compression, bytes).await?)
191    }
192
193    pub async fn decompress_and_decode_vec<T>(
194        &self,
195        slice_of_bytes: impl Iterator<Item = &[u8]>,
196    ) -> DataAnchorClientResult<Vec<T>>
197    where
198        T: Decodable,
199    {
200        futures::stream::iter(slice_of_bytes)
201            .map(|blob| async move { self.decompress_and_decode(blob).await })
202            .buffer_unordered(DEFAULT_CONCURRENCY)
203            .try_collect()
204            .await
205    }
206
207    /// Initializes a new [`Blober`] PDA account.
208    pub async fn initialize_blober(
209        &self,
210        fee_strategy: FeeStrategy,
211        identifier: BloberIdentifier,
212        timeout: Option<Duration>,
213    ) -> DataAnchorClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
214        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
215
216        let in_mock_env = self.in_mock_env();
217        if !in_mock_env && self.check_account_exists(blober).await? {
218            return Err(
219                ChainError::AccountExists(format!("Blober PDA with address {blober}")).into(),
220            );
221        }
222
223        let fee = fee_strategy
224            .convert_fee_strategy_to_fixed(
225                &self.rpc_client,
226                &[blober, self.payer.pubkey()],
227                TransactionType::InitializeBlober,
228            )
229            .in_current_span()
230            .await?;
231
232        if !in_mock_env {
233            let cost = fee
234                .total_fee()
235                .checked_add(fee.rent())
236                .ok_or_else(|| ChainError::CouldNotCalculateCost)?;
237            self.require_balance(cost).await?;
238        }
239
240        let msg = Initialize::build_message(MessageArguments::new(
241            self.program_id,
242            blober,
243            &self.payer,
244            self.rpc_client.clone(),
245            fee,
246            (
247                identifier
248                    .namespace()
249                    .ok_or(ChainError::MissingBloberNamespace)?
250                    .to_owned(),
251                blober,
252            ),
253        ))
254        .await
255        .expect("infallible with a fixed fee strategy");
256
257        let span = info_span!(parent: Span::current(), "initialize_blober");
258        Ok(check_outcomes(
259            self.batch_client
260                .send(vec![(TransactionType::InitializeBlober, msg)], timeout)
261                .instrument(span)
262                .await,
263        )
264        .map_err(ChainError::InitializeBlober)?)
265    }
266
267    /// Closes a [`Blober`] PDA account.
268    pub async fn close_blober(
269        &self,
270        fee_strategy: FeeStrategy,
271        identifier: BloberIdentifier,
272        timeout: Option<Duration>,
273    ) -> DataAnchorClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
274        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
275
276        let in_mock_env = self.in_mock_env();
277
278        if !in_mock_env && !self.check_account_exists(blober).await? {
279            return Err(ChainError::AccountDoesNotExist(format!(
280                "Blober PDA with address {blober}"
281            ))
282            .into());
283        }
284
285        let checkpoint = self.get_checkpoint(identifier.clone()).await?;
286
287        let checkpoint_accounts = if let Some(checkpoint) = checkpoint {
288            let Some(blober_state) = self.get_blober(identifier).await? else {
289                return Err(ChainError::AccountDoesNotExist(format!(
290                    "Blober PDA with address {blober}"
291                ))
292                .into());
293            };
294
295            let checkpointed_hash = checkpoint
296                .final_hash()
297                .map_err(|_| ChainError::CheckpointNotUpToDate)?;
298
299            if checkpoint.slot != blober_state.slot || checkpointed_hash != blober_state.hash {
300                return Err(ChainError::CheckpointNotUpToDate.into());
301            }
302
303            Some((
304                find_checkpoint_address(self.program_id, blober),
305                find_checkpoint_config_address(self.program_id, blober),
306            ))
307        } else {
308            None
309        };
310
311        let fee = fee_strategy
312            .convert_fee_strategy_to_fixed(
313                &self.rpc_client,
314                &[blober, self.payer.pubkey()],
315                TransactionType::CloseBlober,
316            )
317            .in_current_span()
318            .await?;
319
320        if !in_mock_env {
321            self.require_balance(fee.total_fee()).await?;
322        }
323
324        let msg = Close::build_message(MessageArguments::new(
325            self.program_id,
326            blober,
327            &self.payer,
328            self.rpc_client.clone(),
329            fee,
330            checkpoint_accounts,
331        ))
332        .await
333        .expect("infallible with a fixed fee strategy");
334
335        let span = info_span!(parent: Span::current(), "close_blober");
336        Ok(check_outcomes(
337            self.batch_client
338                .send(vec![(TransactionType::CloseBlober, msg)], timeout)
339                .instrument(span)
340                .await,
341        )
342        .map_err(ChainError::CloseBlober)?)
343    }
344
345    /// Uploads a blob of data with the given [`Blober`] PDA account.
346    /// Under the hood it creates a new [`data_anchor_blober::state::blob::Blob`] PDA which stores a
347    /// incremental hash of the chunks from the blob data. On completion of the blob upload, the
348    /// blob PDA gets closed sending it's funds back to the [`DataAnchorClient::payer`].
349    /// If the blob upload fails, the blob PDA gets discarded and the funds also get sent to the
350    /// [`DataAnchorClient::payer`].
351    pub async fn upload_blob<T>(
352        &self,
353        blob_data: &T,
354        fee_strategy: FeeStrategy,
355        namespace: &str,
356        timeout: Option<Duration>,
357    ) -> DataAnchorClientResult<(Vec<SuccessfulTransaction<TransactionType>>, Pubkey)>
358    where
359        T: Encodable,
360    {
361        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
362        let timestamp = get_unique_timestamp();
363
364        let encoded_and_compressed = self.encode_and_compress(blob_data).await?;
365
366        let blob = find_blob_address(
367            self.program_id,
368            self.payer.pubkey(),
369            blober,
370            timestamp,
371            encoded_and_compressed.len(),
372        );
373
374        let in_mock_env = self.in_mock_env();
375        if !in_mock_env && self.check_account_exists(blob).await? {
376            return Err(ChainError::AccountExists(format!("Blob PDA with address {blob}")).into());
377        }
378
379        let fee = self
380            .estimate_fees(encoded_and_compressed.len(), blober, fee_strategy)
381            .await?;
382
383        if !in_mock_env {
384            let cost = fee
385                .total_fee()
386                .checked_add(fee.rent())
387                .ok_or_else(|| ChainError::CouldNotCalculateCost)?;
388            self.require_balance(cost).await?;
389        }
390
391        let upload_messages = self
392            .generate_messages(
393                blob,
394                timestamp,
395                &encoded_and_compressed,
396                fee_strategy,
397                blober,
398            )
399            .await?;
400
401        let res = self
402            .do_upload(upload_messages, timeout)
403            .in_current_span()
404            .await;
405
406        if let Err(DataAnchorClientError::ChainErrors(ChainError::DeclareBlob(_))) = res {
407            self.discard_blob(fee_strategy, blob, namespace, timeout)
408                .await
409        } else {
410            res.map(|r| (r, blob))
411        }
412    }
413
414    /// Discards a [`data_anchor_blober::state::blob::Blob`] PDA account registered with the provided
415    /// [`Blober`] PDA account.
416    pub async fn discard_blob(
417        &self,
418        fee_strategy: FeeStrategy,
419        blob: Pubkey,
420        namespace: &str,
421        timeout: Option<Duration>,
422    ) -> DataAnchorClientResult<(Vec<SuccessfulTransaction<TransactionType>>, Pubkey)> {
423        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
424
425        let in_mock_env = self.in_mock_env();
426        if !in_mock_env && !self.check_account_exists(blob).await? {
427            return Err(
428                ChainError::AccountDoesNotExist(format!("Blob PDA with address {blob}")).into(),
429            );
430        }
431
432        let fee = fee_strategy
433            .convert_fee_strategy_to_fixed(
434                &self.rpc_client,
435                &[blob, self.payer.pubkey()],
436                TransactionType::DiscardBlob,
437            )
438            .in_current_span()
439            .await?;
440
441        if !in_mock_env {
442            self.require_balance(fee.total_fee()).await?;
443        }
444
445        let msg = DiscardBlob::build_message(MessageArguments::new(
446            self.program_id,
447            blober,
448            &self.payer,
449            self.rpc_client.clone(),
450            fee,
451            blob,
452        ))
453        .in_current_span()
454        .await
455        .expect("infallible with a fixed fee strategy");
456
457        let span = info_span!(parent: Span::current(), "discard_blob");
458
459        Ok((
460            check_outcomes(
461                self.batch_client
462                    .send(vec![(TransactionType::DiscardBlob, msg)], timeout)
463                    .instrument(span)
464                    .await,
465            )
466            .map_err(ChainError::DiscardBlob)?,
467            blob,
468        ))
469    }
470
471    /// Configures a checkpoint for a given blober with the given authority.
472    /// This allows the authority to create checkpoints for the blober.
473    pub async fn configure_checkpoint(
474        &self,
475        fee_strategy: FeeStrategy,
476        identifier: BloberIdentifier,
477        authority: Pubkey,
478        timeout: Option<Duration>,
479    ) -> DataAnchorClientResult<(Vec<SuccessfulTransaction<TransactionType>>, Pubkey)> {
480        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
481
482        let checkpoint = find_checkpoint_address(self.program_id, blober);
483        let checkpoint_config = find_checkpoint_config_address(self.program_id, blober);
484
485        let in_mock_env = self.in_mock_env();
486        if !in_mock_env && !self.check_account_exists(blober).await? {
487            return Err(ChainError::AccountDoesNotExist(format!(
488                "Blober PDA with address {blober}"
489            ))
490            .into());
491        }
492
493        let fee = fee_strategy
494            .convert_fee_strategy_to_fixed(
495                &self.rpc_client,
496                &[checkpoint, checkpoint_config, self.payer.pubkey()],
497                TransactionType::ConfigureCheckpoint,
498            )
499            .in_current_span()
500            .await?;
501
502        if !in_mock_env {
503            self.require_balance(fee.total_fee()).await?;
504        }
505
506        info!(
507            "Configuring checkpoint for blober: {}, authority: {}",
508            blober, authority
509        );
510        let msg = ConfigureCheckpoint::build_message(MessageArguments::new(
511            self.program_id,
512            blober,
513            &self.payer,
514            self.rpc_client.clone(),
515            fee,
516            authority,
517        ))
518        .in_current_span()
519        .await
520        .expect("infallible with a fixed fee strategy");
521
522        let span = info_span!(parent: Span::current(), "configure_checkpoint");
523
524        Ok((
525            check_outcomes(
526                self.batch_client
527                    .send(vec![(TransactionType::ConfigureCheckpoint, msg)], timeout)
528                    .instrument(span)
529                    .await,
530            )
531            .map_err(ChainError::ConfigureCheckpoint)?,
532            checkpoint_config,
533        ))
534    }
535
536    /// Estimates fees for uploading a blob of the size `blob_size` with the given `priority`.
537    /// This whole functions is basically a simulation that doesn't run anything. Instead of executing transactions,
538    /// it just sums the expected fees and number of signatures.
539    ///
540    /// The [`data_anchor_blober::state::blob::Blob`] PDA account is always newly created, so for estimating compute fees
541    /// we don't even need the real keypair, any unused pubkey will do.
542    pub async fn estimate_fees(
543        &self,
544        blob_size: usize,
545        blober: Pubkey,
546        fee_strategy: FeeStrategy,
547    ) -> DataAnchorClientResult<Fee> {
548        let prioritization_fee_rate = fee_strategy
549            .convert_fee_strategy_to_fixed(
550                &self.rpc_client,
551                &[Pubkey::new_unique(), blober, self.payer.pubkey()],
552                TransactionType::Compound,
553            )
554            .await?
555            .prioritization_fee_rate;
556
557        let num_chunks = blob_size.div_ceil(CHUNK_SIZE as usize) as u16;
558
559        let (compute_unit_limit, num_signatures) = if blob_size < COMPOUND_TX_SIZE as usize {
560            (Compound::COMPUTE_UNIT_LIMIT, Compound::NUM_SIGNATURES)
561        } else if blob_size < COMPOUND_DECLARE_TX_SIZE as usize {
562            (
563                CompoundDeclare::COMPUTE_UNIT_LIMIT + FinalizeBlob::COMPUTE_UNIT_LIMIT,
564                CompoundDeclare::NUM_SIGNATURES + FinalizeBlob::NUM_SIGNATURES,
565            )
566        } else {
567            (
568                DeclareBlob::COMPUTE_UNIT_LIMIT
569                    + (num_chunks - 1) as u32 * InsertChunk::COMPUTE_UNIT_LIMIT
570                    + CompoundFinalize::COMPUTE_UNIT_LIMIT,
571                DeclareBlob::NUM_SIGNATURES
572                    + (num_chunks - 1) * InsertChunk::NUM_SIGNATURES
573                    + CompoundFinalize::NUM_SIGNATURES,
574            )
575        };
576
577        // The base Solana transaction fee = 5000.
578        // Reference link: https://solana.com/docs/core/fees#:~:text=While%20transaction%20fees%20are%20paid,of%205k%20lamports%20per%20signature.
579        let price_per_signature = Lamports::new(5000);
580
581        let blob_account_size = Blober::DISCRIMINATOR.len() + Blober::INIT_SPACE;
582
583        Ok(Fee {
584            num_signatures,
585            price_per_signature,
586            compute_unit_limit,
587            prioritization_fee_rate,
588            blob_account_size,
589        })
590    }
591}