data_anchor_client/client/
mod.rs

1use std::{sync::Arc, time::Duration};
2
3use anchor_lang::{Discriminator, Space, prelude::Pubkey};
4use bon::Builder;
5use data_anchor_blober::{
6    CHUNK_SIZE, COMPOUND_DECLARE_TX_SIZE, COMPOUND_TX_SIZE, find_blob_address, find_blober_address,
7    find_checkpoint_address, find_checkpoint_config_address,
8    instruction::{
9        Close, ConfigureCheckpoint, DeclareBlob, DiscardBlob, FinalizeBlob, Initialize, InsertChunk,
10    },
11    state::blober::Blober,
12};
13use data_anchor_utils::{
14    compression::CompressionType,
15    decompress_and_decode_async, encode_and_compress_async,
16    encoding::{Decodable, Encodable, EncodingType},
17};
18use futures::{StreamExt, TryStreamExt};
19use jsonrpsee::http_client::HttpClient;
20use nitro_sender::{NitroSender, SuccessfulTransaction};
21use solana_commitment_config::CommitmentConfig;
22use solana_keypair::Keypair;
23use solana_rpc_client::nonblocking::rpc_client::RpcClient;
24use solana_signer::Signer;
25use tracing::{Instrument, Span, info, info_span, trace};
26
27use crate::{
28    DataAnchorClientError, DataAnchorClientResult, IndexerUrl,
29    constants::DEFAULT_CONCURRENCY,
30    fees::{Fee, FeeStrategy, Lamports},
31    helpers::{check_outcomes, get_unique_timestamp},
32    tx::{Compound, CompoundDeclare, CompoundFinalize, MessageArguments, MessageBuilder},
33    types::TransactionType,
34};
35
36mod builder;
37mod indexer_client;
38mod ledger_client;
39mod proof_client;
40
41pub use indexer_client::IndexerError;
42pub use ledger_client::ChainError;
43pub use proof_client::ProofError;
44
45/// Identifier for a blober, which can be either a combination of payer and namespace or just a pubkey.
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum BloberIdentifier {
48    Namespace(String),
49    PayerAndNamespace { payer: Pubkey, namespace: String },
50    Pubkey(Pubkey),
51}
52
53#[derive(Debug, thiserror::Error)]
54pub enum BloberIdentifierError {
55    /// Error indicating that the blober identifier is missing.
56    #[error(
57        "Missing blober identifier: either namespace, namespace and payer or blober PDA must be provided."
58    )]
59    MissingBloberIdentifier,
60}
61
62impl TryFrom<(Option<String>, Option<Pubkey>)> for BloberIdentifier {
63    type Error = BloberIdentifierError;
64
65    fn try_from(
66        (namespace, blober_pda): (Option<String>, Option<Pubkey>),
67    ) -> Result<Self, Self::Error> {
68        match (namespace, blober_pda) {
69            (Some(namespace), None) => Ok(namespace.into()),
70            (None, Some(pubkey)) => Ok(pubkey.into()),
71            (Some(namespace), Some(payer)) => Ok((payer, namespace).into()),
72            _ => Err(BloberIdentifierError::MissingBloberIdentifier),
73        }
74    }
75}
76
77impl From<String> for BloberIdentifier {
78    fn from(namespace: String) -> Self {
79        BloberIdentifier::Namespace(namespace)
80    }
81}
82
83impl From<(Pubkey, String)> for BloberIdentifier {
84    fn from((payer, namespace): (Pubkey, String)) -> Self {
85        BloberIdentifier::PayerAndNamespace { payer, namespace }
86    }
87}
88
89impl From<Pubkey> for BloberIdentifier {
90    fn from(pubkey: Pubkey) -> Self {
91        BloberIdentifier::Pubkey(pubkey)
92    }
93}
94
95impl BloberIdentifier {
96    /// Converts the [`BloberIdentifier`] to a [`Pubkey`] representing the blober address.
97    pub fn to_blober_address(&self, program_id: Pubkey, payer: Pubkey) -> Pubkey {
98        match self {
99            BloberIdentifier::Namespace(namespace) => {
100                find_blober_address(program_id, payer, namespace)
101            }
102            BloberIdentifier::PayerAndNamespace { payer, namespace } => {
103                find_blober_address(program_id, *payer, namespace)
104            }
105            BloberIdentifier::Pubkey(pubkey) => *pubkey,
106        }
107    }
108
109    /// Returns the namespace of the blober identifier.
110    pub fn namespace(&self) -> Option<&str> {
111        match self {
112            BloberIdentifier::Namespace(namespace) => Some(namespace),
113            BloberIdentifier::PayerAndNamespace { namespace, .. } => Some(namespace),
114            BloberIdentifier::Pubkey(_) => None,
115        }
116    }
117}
118
119#[derive(Builder, Clone)]
120pub struct DataAnchorClient {
121    #[builder(getter(name = get_payer, vis = ""))]
122    pub(crate) payer: Arc<Keypair>,
123    #[builder(default = data_anchor_blober::id())]
124    pub(crate) program_id: Pubkey,
125    pub(crate) rpc_client: Arc<RpcClient>,
126    pub(crate) nitro_sender: NitroSender,
127    #[builder(getter(name = get_indexer, vis = ""))]
128    #[allow(dead_code, reason = "Used in builder")]
129    indexer: Option<IndexerUrl>,
130    pub(crate) indexer_client: Option<Arc<HttpClient>>,
131    pub(crate) proof_client: Option<Arc<HttpClient>>,
132    #[builder(default)]
133    pub(crate) encoding: EncodingType,
134    #[builder(default)]
135    pub(crate) compression: CompressionType,
136}
137
138impl DataAnchorClient {
139    /// Returns the underlaying [`RpcClient`].
140    pub fn rpc_client(&self) -> Arc<RpcClient> {
141        self.rpc_client.clone()
142    }
143
144    /// Returns the transaction payer [`Keypair`].
145    pub fn payer(&self) -> Arc<Keypair> {
146        self.payer.clone()
147    }
148
149    fn in_mock_env(&self) -> bool {
150        self.rpc_client.url().starts_with("MockSender")
151    }
152
153    async fn check_account_exists(&self, account: Pubkey) -> DataAnchorClientResult<bool> {
154        Ok(self
155            .rpc_client
156            .get_account_with_commitment(&account, CommitmentConfig::confirmed())
157            .await
158            .map(|res| res.value.is_some())?)
159    }
160
161    async fn require_balance(&self, cost: Lamports) -> DataAnchorClientResult {
162        let balance = self
163            .rpc_client
164            .get_balance_with_commitment(&self.payer.pubkey(), CommitmentConfig::confirmed())
165            .await
166            .map(|r| r.value)?;
167        let cost_u64 = cost.into_inner() as u64;
168        if balance < cost_u64 {
169            info!(
170                "Balance check failed: required={} lamports, available={} lamports, deficit={} lamports",
171                cost_u64,
172                balance,
173                cost_u64 - balance
174            );
175            return Err(ChainError::InsufficientBalance(cost_u64, balance).into());
176        }
177        trace!(
178            "Balance check passed: required={} lamports, available={} lamports, remaining={} lamports",
179            cost_u64,
180            balance,
181            balance - cost_u64
182        );
183        Ok(())
184    }
185
186    pub async fn encode_and_compress<T>(&self, data: &T) -> DataAnchorClientResult<Vec<u8>>
187    where
188        T: Encodable,
189    {
190        Ok(encode_and_compress_async(&self.encoding, &self.compression, data).await?)
191    }
192
193    pub async fn decompress_and_decode<T>(&self, bytes: &[u8]) -> DataAnchorClientResult<T>
194    where
195        T: Decodable,
196    {
197        Ok(decompress_and_decode_async(bytes).await?)
198    }
199
200    pub async fn decompress_and_decode_vec<T>(
201        &self,
202        slice_of_bytes: impl Iterator<Item = &[u8]>,
203    ) -> DataAnchorClientResult<Vec<T>>
204    where
205        T: Decodable,
206    {
207        futures::stream::iter(slice_of_bytes)
208            .map(|blob| async move { self.decompress_and_decode(blob).await })
209            .buffer_unordered(DEFAULT_CONCURRENCY)
210            .try_collect()
211            .await
212    }
213
214    /// Initializes a new [`Blober`] PDA account.
215    pub async fn initialize_blober(
216        &self,
217        fee_strategy: FeeStrategy,
218        identifier: BloberIdentifier,
219        timeout: Option<Duration>,
220    ) -> DataAnchorClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
221        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
222
223        let in_mock_env = self.in_mock_env();
224        if !in_mock_env && self.check_account_exists(blober).await? {
225            return Err(
226                ChainError::AccountExists(format!("Blober PDA with address {blober}")).into(),
227            );
228        }
229
230        let fee = fee_strategy
231            .convert_fee_strategy_to_fixed(
232                &self.rpc_client,
233                &[blober, self.payer.pubkey()],
234                TransactionType::InitializeBlober,
235            )
236            .in_current_span()
237            .await?;
238
239        if !in_mock_env {
240            let cost = fee
241                .total_fee()
242                .checked_add(fee.rent())
243                .ok_or_else(|| ChainError::CouldNotCalculateCost)?;
244            self.require_balance(cost).await?;
245        }
246
247        let msg = Initialize::build_message(MessageArguments::new(
248            self.program_id,
249            blober,
250            &self.payer,
251            self.rpc_client.clone(),
252            fee,
253            (
254                identifier
255                    .namespace()
256                    .ok_or(ChainError::MissingBloberNamespace)?
257                    .to_owned(),
258                blober,
259            ),
260        ))
261        .await;
262
263        let span = info_span!(parent: Span::current(), "initialize_blober");
264        Ok(check_outcomes(
265            self.nitro_sender
266                .send(vec![(TransactionType::InitializeBlober, msg)], timeout)
267                .instrument(span)
268                .await,
269            self.rpc_client.commitment(),
270        )
271        .map_err(ChainError::InitializeBlober)?)
272    }
273
274    /// Closes a [`Blober`] PDA account.
275    pub async fn close_blober(
276        &self,
277        fee_strategy: FeeStrategy,
278        identifier: BloberIdentifier,
279        timeout: Option<Duration>,
280    ) -> DataAnchorClientResult<Vec<SuccessfulTransaction<TransactionType>>> {
281        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
282
283        let in_mock_env = self.in_mock_env();
284
285        if !in_mock_env && !self.check_account_exists(blober).await? {
286            return Err(ChainError::AccountDoesNotExist(format!(
287                "Blober PDA with address {blober}"
288            ))
289            .into());
290        }
291
292        let checkpoint = self.get_checkpoint(identifier.clone()).await?;
293
294        let checkpoint_accounts = if let Some(checkpoint) = checkpoint {
295            let Some(blober_state) = self.get_blober(identifier).await? else {
296                return Err(ChainError::AccountDoesNotExist(format!(
297                    "Blober PDA with address {blober}"
298                ))
299                .into());
300            };
301
302            let checkpointed_hash = checkpoint
303                .final_hash()
304                .map_err(|_| ChainError::CheckpointNotUpToDate)?;
305
306            if checkpoint.slot != blober_state.slot || checkpointed_hash != blober_state.hash {
307                return Err(ChainError::CheckpointNotUpToDate.into());
308            }
309
310            Some((
311                find_checkpoint_address(self.program_id, blober),
312                find_checkpoint_config_address(self.program_id, blober),
313            ))
314        } else {
315            None
316        };
317
318        let fee = fee_strategy
319            .convert_fee_strategy_to_fixed(
320                &self.rpc_client,
321                &[blober, self.payer.pubkey()],
322                TransactionType::CloseBlober,
323            )
324            .in_current_span()
325            .await?;
326
327        if !in_mock_env {
328            self.require_balance(fee.total_fee()).await?;
329        }
330
331        let msg = Close::build_message(MessageArguments::new(
332            self.program_id,
333            blober,
334            &self.payer,
335            self.rpc_client.clone(),
336            fee,
337            checkpoint_accounts,
338        ))
339        .await;
340
341        let span = info_span!(parent: Span::current(), "close_blober");
342        Ok(check_outcomes(
343            self.nitro_sender
344                .send(vec![(TransactionType::CloseBlober, msg)], timeout)
345                .instrument(span)
346                .await,
347            self.rpc_client.commitment(),
348        )
349        .map_err(ChainError::CloseBlober)?)
350    }
351
352    /// Uploads a blob of data with the given [`Blober`] PDA account.
353    /// Under the hood it creates a new [`data_anchor_blober::state::blob::Blob`] PDA which stores a
354    /// incremental hash of the chunks from the blob data. On completion of the blob upload, the
355    /// blob PDA gets closed sending it's funds back to the [`DataAnchorClient::payer`].
356    /// If the blob upload fails, the blob PDA gets discarded and the funds also get sent to the
357    /// [`DataAnchorClient::payer`].
358    pub async fn upload_blob<T>(
359        &self,
360        blob_data: &T,
361        fee_strategy: FeeStrategy,
362        namespace: &str,
363        timeout: Option<Duration>,
364    ) -> DataAnchorClientResult<(Vec<SuccessfulTransaction<TransactionType>>, Pubkey)>
365    where
366        T: Encodable,
367    {
368        info!(
369            "Starting blob upload: namespace='{}', original_size={} bytes",
370            namespace,
371            std::mem::size_of_val(blob_data)
372        );
373
374        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
375        let timestamp = get_unique_timestamp();
376
377        let encoded_and_compressed = self.encode_and_compress(blob_data).await?;
378
379        info!(
380            "Blob encoding/compression completed: compressed_size={} bytes, ratio={:.2}%",
381            encoded_and_compressed.len(),
382            (encoded_and_compressed.len() as f64 / std::mem::size_of_val(blob_data) as f64) * 100.0
383        );
384
385        let blob = find_blob_address(
386            self.program_id,
387            self.payer.pubkey(),
388            blober,
389            timestamp,
390            encoded_and_compressed.len(),
391        );
392
393        info!(
394            "Created blob PDA: blob={}, blober={}, timestamp={}",
395            blob, blober, timestamp
396        );
397
398        let in_mock_env = self.in_mock_env();
399        if !in_mock_env && self.check_account_exists(blob).await? {
400            return Err(ChainError::AccountExists(format!("Blob PDA with address {blob}")).into());
401        }
402
403        let fee = self
404            .estimate_fees(encoded_and_compressed.len(), blober, fee_strategy)
405            .await?;
406
407        if !in_mock_env {
408            let cost = fee
409                .total_fee()
410                .checked_add(fee.rent())
411                .ok_or_else(|| ChainError::CouldNotCalculateCost)?;
412            self.require_balance(cost).await?;
413        }
414
415        let upload_messages = self
416            .generate_messages(
417                blob,
418                timestamp,
419                &encoded_and_compressed,
420                fee_strategy,
421                blober,
422            )
423            .await?;
424
425        let res = self
426            .do_upload(upload_messages, timeout)
427            .in_current_span()
428            .await;
429
430        if let Err(DataAnchorClientError::ChainErrors(ChainError::DeclareBlob(_))) = res {
431            self.discard_blob(fee_strategy, blob, namespace, timeout)
432                .await
433        } else {
434            res.map(|r| (r, blob))
435        }
436    }
437
438    /// Discards a [`data_anchor_blober::state::blob::Blob`] PDA account registered with the provided
439    /// [`Blober`] PDA account.
440    pub async fn discard_blob(
441        &self,
442        fee_strategy: FeeStrategy,
443        blob: Pubkey,
444        namespace: &str,
445        timeout: Option<Duration>,
446    ) -> DataAnchorClientResult<(Vec<SuccessfulTransaction<TransactionType>>, Pubkey)> {
447        let blober = find_blober_address(self.program_id, self.payer.pubkey(), namespace);
448
449        let in_mock_env = self.in_mock_env();
450        if !in_mock_env && !self.check_account_exists(blob).await? {
451            return Err(
452                ChainError::AccountDoesNotExist(format!("Blob PDA with address {blob}")).into(),
453            );
454        }
455
456        let fee = fee_strategy
457            .convert_fee_strategy_to_fixed(
458                &self.rpc_client,
459                &[blob, self.payer.pubkey()],
460                TransactionType::DiscardBlob,
461            )
462            .in_current_span()
463            .await?;
464
465        if !in_mock_env {
466            self.require_balance(fee.total_fee()).await?;
467        }
468
469        let msg = DiscardBlob::build_message(MessageArguments::new(
470            self.program_id,
471            blober,
472            &self.payer,
473            self.rpc_client.clone(),
474            fee,
475            blob,
476        ))
477        .in_current_span()
478        .await;
479
480        let span = info_span!(parent: Span::current(), "discard_blob");
481
482        Ok((
483            check_outcomes(
484                self.nitro_sender
485                    .send(vec![(TransactionType::DiscardBlob, msg)], timeout)
486                    .instrument(span)
487                    .await,
488                self.rpc_client.commitment(),
489            )
490            .map_err(ChainError::DiscardBlob)?,
491            blob,
492        ))
493    }
494
495    /// Configures a checkpoint for a given blober with the given authority.
496    /// This allows the authority to create checkpoints for the blober.
497    pub async fn configure_checkpoint(
498        &self,
499        fee_strategy: FeeStrategy,
500        identifier: BloberIdentifier,
501        authority: Pubkey,
502        timeout: Option<Duration>,
503    ) -> DataAnchorClientResult<(Vec<SuccessfulTransaction<TransactionType>>, Pubkey)> {
504        let blober = identifier.to_blober_address(self.program_id, self.payer.pubkey());
505
506        let checkpoint = find_checkpoint_address(self.program_id, blober);
507        let checkpoint_config = find_checkpoint_config_address(self.program_id, blober);
508
509        let in_mock_env = self.in_mock_env();
510        if !in_mock_env && !self.check_account_exists(blober).await? {
511            return Err(ChainError::AccountDoesNotExist(format!(
512                "Blober PDA with address {blober}"
513            ))
514            .into());
515        }
516
517        let fee = fee_strategy
518            .convert_fee_strategy_to_fixed(
519                &self.rpc_client,
520                &[checkpoint, checkpoint_config, self.payer.pubkey()],
521                TransactionType::ConfigureCheckpoint,
522            )
523            .in_current_span()
524            .await?;
525
526        if !in_mock_env {
527            self.require_balance(fee.total_fee()).await?;
528        }
529
530        info!(
531            "Configuring checkpoint for blober: {}, authority: {}",
532            blober, authority
533        );
534        let msg = ConfigureCheckpoint::build_message(MessageArguments::new(
535            self.program_id,
536            blober,
537            &self.payer,
538            self.rpc_client.clone(),
539            fee,
540            authority,
541        ))
542        .in_current_span()
543        .await;
544
545        let span = info_span!(parent: Span::current(), "configure_checkpoint");
546
547        Ok((
548            check_outcomes(
549                self.nitro_sender
550                    .send(vec![(TransactionType::ConfigureCheckpoint, msg)], timeout)
551                    .instrument(span)
552                    .await,
553                self.rpc_client.commitment(),
554            )
555            .map_err(ChainError::ConfigureCheckpoint)?,
556            checkpoint_config,
557        ))
558    }
559
560    /// Estimates fees for uploading a blob of the size `blob_size` with the given `priority`.
561    /// This whole functions is basically a simulation that doesn't run anything. Instead of executing transactions,
562    /// it just sums the expected fees and number of signatures.
563    ///
564    /// The [`data_anchor_blober::state::blob::Blob`] PDA account is always newly created, so for estimating compute fees
565    /// we don't even need the real keypair, any unused pubkey will do.
566    pub async fn estimate_fees(
567        &self,
568        blob_size: usize,
569        blober: Pubkey,
570        fee_strategy: FeeStrategy,
571    ) -> DataAnchorClientResult<Fee> {
572        let prioritization_fee_rate = fee_strategy
573            .convert_fee_strategy_to_fixed(
574                &self.rpc_client,
575                &[Pubkey::new_unique(), blober, self.payer.pubkey()],
576                TransactionType::Compound,
577            )
578            .await?
579            .prioritization_fee_rate;
580
581        let num_chunks = blob_size.div_ceil(CHUNK_SIZE as usize) as u16;
582
583        let (compute_unit_limit, num_signatures) = if blob_size < COMPOUND_TX_SIZE as usize {
584            (Compound::COMPUTE_UNIT_LIMIT, Compound::NUM_SIGNATURES)
585        } else if blob_size < COMPOUND_DECLARE_TX_SIZE as usize {
586            (
587                CompoundDeclare::COMPUTE_UNIT_LIMIT + FinalizeBlob::COMPUTE_UNIT_LIMIT,
588                CompoundDeclare::NUM_SIGNATURES + FinalizeBlob::NUM_SIGNATURES,
589            )
590        } else {
591            (
592                DeclareBlob::COMPUTE_UNIT_LIMIT
593                    + (num_chunks - 1) as u32 * InsertChunk::COMPUTE_UNIT_LIMIT
594                    + CompoundFinalize::COMPUTE_UNIT_LIMIT,
595                DeclareBlob::NUM_SIGNATURES
596                    + (num_chunks - 1) * InsertChunk::NUM_SIGNATURES
597                    + CompoundFinalize::NUM_SIGNATURES,
598            )
599        };
600
601        // The base Solana transaction fee = 5000.
602        // Reference link: https://solana.com/docs/core/fees#:~:text=While%20transaction%20fees%20are%20paid,of%205k%20lamports%20per%20signature.
603        let price_per_signature = Lamports::new(5000);
604
605        let blob_account_size = Blober::DISCRIMINATOR.len() + Blober::INIT_SPACE;
606
607        let fee = Fee {
608            num_signatures,
609            price_per_signature,
610            compute_unit_limit,
611            prioritization_fee_rate,
612            blob_account_size,
613        };
614
615        info!(
616            "Fee estimation: blob_size={} bytes, chunks={}, total_fee={} lamports (static: {}, prioritization: {})",
617            blob_size,
618            num_chunks,
619            fee.total_fee().into_inner(),
620            fee.static_fee().into_inner(),
621            fee.prioritization_fee().into_inner()
622        );
623
624        Ok(fee)
625    }
626}